SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統(tǒng)合する方法
May 18, 2023 am 10:04 AM環(huán)境: springboot2.3.9RELEASE RocketMQ4.8.0
依存関係
<dependency>???<groupid>org.springframework.boot</groupid>?????<artifactid>spring-boot-starter-web</artifactid>?</dependency>?<dependency>?????<groupid>org.apache.rocketmq</groupid>?????<artifactid>rocketmq-spring-boot-starter</artifactid>?????<version>2.2.0</version>?</dependency>
設(shè)定ファイル
server:???port:?8080?---?rocketmq:???nameServer:?localhost:9876???producer:?????group:?demo-mq
通常のメッセージ
送信
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?message)?{???rocketMQTemplate.convertAndSend("test-topic:tag2",?MessageBuilder.withPayload(message).build());?}
Accept
@RocketMQMessageListener(topic?=?"test-topic",?consumerGroup?=?"consumer01-group",?selectorExpression?=?"tag1?||?tag2")?@Component?public?class?ConsumerListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("接收到消息:"?+?message)?;?????}??}</string>
連続メッセージ
Send
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??public?void?sendOrder(String?topic,?String?message,?String?tags,?int?id)?{?????rocketMQTemplate.asyncSendOrderly(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build(),??????????????"order-"?+?id,?new?SendCallback()?{?????????????????@Override?????????????????public?void?onSuccess(SendResult?sendResult)?{?????????????????????System.err.println("msg-id:?"?+?sendResult.getMsgId()?+?":?"?+?message?+"\tqueueId:?"?+?sendResult.getMessageQueue().getQueueId())?;?????????????????}?????????????????@Override?????????????????public?void?onException(Throwable?e)?{?????????????????????e.printStackTrace()?;?????????????????}?????????????});?}
ハッシュキー
@RocketMQMessageListener(topic?=?"order-topic",?consumerGroup?=?"consumer02-group",??????selectorExpression?=?"tag3?||?tag4",?consumeMode?=?ConsumeMode.ORDERLY)?@Component?public?class?ConsumerOrderListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println(Thread.currentThread().getName()?+?"?接收到Order消息:"?+?message)?;?????}??}</string>
consumeMode = ConsumeMode.ORDERLYに基づいて別のキューに送信されるメッセージを示します。メッセージ モードはシーケンシャル モードで、1 つのキューと 1 つのスレッドです。
Result
consumeMode = ConsumeMode.CONCURRENTLY の場(chǎng)合、実行結(jié)果は次のようになります。
# #クラスター/ブロードキャスト メッセージ モード
送信者
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?topic,?String?message,?String?tags)?{?????rocketMQTemplate.send(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build())?;?}
クラスター メッセージ モード
コンシューマー
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.CLUSTERING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}</string>
messageModel = MessageModel.CLUSTERING
Test
ポート 8080 と 8081 で 2 つのサービスを開始します
8080 サービス
8081 サービス
クラスター メッセージ モードでは、各サービスは負(fù)荷分散を?qū)g現(xiàn)するためにメッセージの一部を個(gè)別に受信します。
ブロードキャスト メッセージ モード
コンシューマー エンド
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.BROADCASTING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}</string>
messageModel = MessageModel。ブロードキャスティング
テスト
ポート 8080 と 8081 で 2 つのサービスを開始します
8080 サービス
8081 サービス
1. プロデューサは MQServer にハーフ メッセージを送信します (ハーフ メッセージとは、コンシューマが一時(shí)的に消費(fèi)できないメッセージを指します)2. サーバーはメッセージの書き込み結(jié)果に応答し、ハーフ メッセージが送信されます正常に完了しました 3. ローカルトランザクションの実行を開始します4. ローカルトランザクションの実行狀況に応じてCommitまたはRollbackを?qū)g行しますトランザクション情報(bào)の補(bǔ)償処理1. MQServer が長(zhǎng)期間受信しない場(chǎng)合、ローカル トランザクションの実行ステータスにより、プロデューサーへの確認(rèn)レビュー操作リクエストが開始されます。2. プロデューサが確認(rèn)レビュー リクエストを受信した後、ローカル トランザクションの実行ステータスを確認(rèn)します。3. 「結(jié)果を確認(rèn)した後、コミットまたはロールバック操作を?qū)g行します?!工摔瑜毪?、
#補(bǔ)償フェーズは主にタイムアウトまたはロールバックの問題を解決するために使用されます。プロデューサがコミットまたはロールバック操作を送信すると失敗します。
Sender
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?sendTx(String?topic,?Long?id,?String?tags)?{?????rocketMQTemplate.sendMessageInTransaction(topic?+?":"?+?tags,?MessageBuilder.withPayload(?????????????new?Users(id,?UUID.randomUUID().toString().replaceAll("-",?""))).?????????????setHeader("BID",?UUID.randomUUID().toString().replaceAll("-",?"")).build(),??????????????UUID.randomUUID().toString().replaceAll("-",?""))?;?}
プロデューサーに対応するリスナー
@RocketMQTransactionListener?public?class?ProducerTxListener?implements?RocketMQLocalTransactionListener?{??????????@Resource?????private?BusinessService?bs?;??????@Override?????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{?????????//?這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。?????????try?{?????????????//?創(chuàng)建一個(gè)日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個(gè)id查詢是否有數(shù)據(jù)?????????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????????Users?users?=?new?JsonMapper().readValue((byte[])msg.getPayload(),?Users.class)?;?????????????System.out.println("消息內(nèi)容:"?+?users?+?"\t參與數(shù)據(jù):"?+?arg?+?"\t本次事務(wù)的唯一編號(hào):"?+?id)?;?????????????bs.save(users,?new?UsersLog(users.getId(),?id))?;?????????}?catch?(Exception?e)?{?????????????e.printStackTrace()?;?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??????@Override?????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{?????????//?這里檢查本地事務(wù)是否執(zhí)行成功?????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????System.out.println("執(zhí)行查詢ID為:"?+?id?+?"?的數(shù)據(jù)是否存在")?;?????????UsersLog?usersLog?=?bs.queryUsersLog(id)?;?????????if?(usersLog?==?null)?{?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??}
Consumer
@RocketMQMessageListener(topic?=?"tx-topic",?consumerGroup?=?"consumer05-group",?selectorExpression?=?"tag10")?@Component?public?class?ConsumerTxListener?implements?RocketMQListener<users>?{??????@Override?????public?void?onMessage(Users?users)?{?????????System.out.println("TX接收到消息:"?+?users)?;?????}??}</users>
Service
@Transactional?public?boolean?save(Users?users,?UsersLog?usersLog)?{?????usersRepository.save(users)?;?????usersLogRepository.save(usersLog)?;?????if?(users.getId()?==?1)?{?????????throw?new?RuntimeException("數(shù)據(jù)錯(cuò)誤")?;?????}?????return?true?;?}??????public?UsersLog?queryUsersLog(String?bid)?{?????return?usersLogRepository.findByBid(bid)?;?}
Controller
@GetMapping("/tx/{id}")?public?Object?sendTx(@PathVariable("id")Long?id)?{?????ps.sendTx("tx-topic",?id,?"tag10")?;?????return?"send?transaction?success"?;?}
Test
インターフェイスを呼び出した後のコンソール出力:
印刷ログから、コンシューマはすべての処理が完了した後でのみメッセージを受信することがわかります。メッセージが保存されました。
データを削除してから ID を 1 としてテストすると、エラーが発生します。
#データベースにデータがありません。 。 。
以上がSpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統(tǒng)合する方法の詳細(xì)內(nèi)容です。詳細(xì)については、PHP 中國(guó)語 Web サイトの他の関連記事を參照してください。

ホットAIツール

Undress AI Tool
脫衣畫像を無料で

Undresser.AI Undress
リアルなヌード寫真を作成する AI 搭載アプリ

AI Clothes Remover
寫真から衣服を削除するオンライン AI ツール。

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡(jiǎn)単に交換できます。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中國(guó)語版
中國(guó)語版、とても使いやすい

ゼンドスタジオ 13.0.1
強(qiáng)力な PHP 統(tǒng)合開発環(huán)境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

Jasypt の概要 Jasypt は、開発者が最小限の労力で基本的な暗號(hào)化機(jī)能を自分のプロジェクトに追加できる Java ライブラリであり、暗號(hào)化の仕組みを深く理解する必要はありません。一方向および雙方向暗號(hào)化の高いセキュリティ。標(biāo)準(zhǔn)ベースの暗號(hào)化テクノロジー。パスワード、テキスト、數(shù)値、バイナリを暗號(hào)化します... Spring ベースのアプリケーション、オープン API への統(tǒng)合、JCE プロバイダーでの使用に適しています... 次の依存関係を追加します: com.github.ulisesbocchiojasypt-spring-boot-starter2. 1.1 Jasypt の特典はシステムのセキュリティを保護(hù)し、コードが漏洩した場(chǎng)合でもデータ ソースは保証されます。

1. Redis は分散ロックの原則を?qū)g裝しており、分散ロックが必要な理由 分散ロックについて話す前に、分散ロックが必要な理由を説明する必要があります。分散ロックの反対はスタンドアロン ロックです。マルチスレッド プログラムを作成するとき、共有変數(shù)を同時(shí)に操作することによって引き起こされるデータの問題を回避します。通常、ロックを使用して共有変數(shù)を相互に除外し、データの正確性を確保します。共有変數(shù)の使用範(fàn)囲は同じプロセス內(nèi)です。共有リソースを同時(shí)に操作する必要があるプロセスが複數(shù)ある場(chǎng)合、どうすれば相互排他的になるのでしょうか?今日のビジネス アプリケーションは通常マイクロサービス アーキテクチャであり、これは 1 つのアプリケーションが複數(shù)のプロセスをデプロイすることも意味します。複數(shù)のプロセスが MySQL の同じレコード行を変更する必要がある場(chǎng)合、順序の亂れた操作によって引き起こされるダーティ データを避けるために、分散が必要です。今回導(dǎo)入するスタイルはロックされています。ポイントを獲得したい

1. RedisAPI のデフォルトのシリアル化メカニズムである RedisTemplate1.1 をカスタマイズします。API ベースの Redis キャッシュ実裝では、データ キャッシュ操作に RedisTemplate テンプレートを使用します。ここで、RedisTemplate クラスを開いて、クラスのソース コード情報(bào)を表示します。publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations、BeanClassLoaderAware{//キーを宣言、値の各種シリアル化メソッド、初期値は空 @NullableprivateRedisSe

Springboot はファイルを読み取りますが、jar パッケージにパッケージ化した後、最新の開発にアクセスできません。jar パッケージにパッケージ化した後、Springboot がファイルを読み取れない狀況があります。その理由は、パッケージ化後、ファイルの仮想パスが変更されるためです。は無効であり、ストリーム経由でのみアクセスできます。読み取ります。ファイルはリソースの下にあります publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

使用シナリオ 1. 注文は正常に行われましたが、支払いが 30 分以內(nèi)に行われませんでした。支払いがタイムアウトになり、注文が自動(dòng)的にキャンセルされました 2. 注文に署名があり、署名後 7 日間評(píng)価が行われませんでした。注文がタイムアウトして評(píng)価されない場(chǎng)合、システムはデフォルトでプラスの評(píng)価を設(shè)定します 3. 注文は正常に行われます。販売者が 5 分間注文を受け取らない場(chǎng)合、注文はキャンセルされます。 4. 配送がタイムアウトします。 SMS リマインダーをプッシュします... 遅延が長(zhǎng)く、リアルタイム パフォーマンスが低いシナリオでは、タスク スケジュールを使用して定期的なポーリング処理を?qū)g行できます。例: xxl-job 今日は選択します

Springboot+Mybatis-plus が SQL ステートメントを使用して複數(shù)テーブルの追加操作を?qū)g行しない場(chǎng)合、私が遭遇した問題は、テスト環(huán)境で思考をシミュレートすることによって分解されます: パラメーターを含む BrandDTO オブジェクトを作成し、パラメーターをバックグラウンドに渡すことをシミュレートします。 Mybatis-plus で複數(shù)テーブルの操作を?qū)g行するのは非常に難しいことを理解してください。Mybatis-plus-join などのツールを使用しない場(chǎng)合は、対応する Mapper.xml ファイルを設(shè)定し、臭くて長(zhǎng)い ResultMap を設(shè)定するだけです。対応する SQL ステートメントを記述します。この方法は面倒に見えますが、柔軟性が高く、次のことが可能です。

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機(jī)能と使用法を調(diào)べ、その違いを比較します。まず、SpringBoot について學(xué)びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡(jiǎn)素化するために、Pivo??tal チームによって開発されました。スタンドアロンの実行可能ファイルを構(gòu)築するための高速かつ軽量な方法を提供します。

1. @Import による通常クラスの導(dǎo)入 @Import による通常クラスの導(dǎo)入は、通常のクラスを Bean として定義するのに役立ちます。 @Importは、@SpringBootApplication(スタートアップクラス)、@Configuration(コンフィグレーションクラス)、@Component(コンポーネントクラス)に対応するクラスに追加できます。注: @RestController、@Service、および @Repository はすべて、@Import アノテーションを通じて @Component@SpringBootApplication@Import(ImportBean.class)//ImportBean に屬します。
