ホームページ > Java > &#&チュートリアル > SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

WBOY
リリース: 2023-05-18 10:04:05
転載
1900 人が閲覧しました

環境: springboot2.3.9RELEASE RocketMQ4.8.0

依存関係

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>
ログイン後にコピー

設定ファイル

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq
ログイン後にコピー

通常のメッセージ

送信

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
ログイン後にコピー

Accept

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>
ログイン後にコピー

連続メッセージ

Send

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }
ログイン後にコピー

ハッシュキー

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>
ログイン後にコピー

consumeMode = ConsumeMode.ORDERLYに基づいて別のキューに送信されるメッセージを示します。メッセージ モードはシーケンシャル モードで、1 つのキューと 1 つのスレッドです。

Result

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

consumeMode = ConsumeMode.CONCURRENTLY の場合、実行結果は次のようになります。

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

# #クラスター/ブロードキャスト メッセージ モード

送信者

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
ログイン後にコピー

クラスター メッセージ モード

コンシューマー

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
ログイン後にコピー

messageModel = MessageModel.CLUSTERING

Test

ポート 8080 と 8081 で 2 つのサービスを開始します

8080 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

8081 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

クラスター メッセージ モードでは、各サービスは負荷分散を実現するためにメッセージの一部を個別に受信します。

ブロードキャスト メッセージ モード

コンシューマー エンド

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
ログイン後にコピー

messageModel = MessageModel。ブロードキャスティング

テスト

ポート 8080 と 8081 で 2 つのサービスを開始します

8080 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

8081 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

#クラスター メッセージ モードでは、各サービスは同じメッセージを受信します。

トランザクション メッセージ

RocketMQ トランザクションの 3 つのステータス

TransactionStatus.CommitTransaction: トランザクション メッセージを送信し、コンシューマーはこのメッセージを消費できます

TransactionStatus.RollbackTransaction: ローリングトランザクションを戻すということは、メッセージが削除され、使用できなくなることを意味します。

TransactionStatus.Unknown: 中間ステータス。ステータスを判断するためにメッセージ キューをチェックする必要があることを表します。

RocketMQ のトランザクション メッセージの実装は、主に 2 つの段階に分かれています: 通常のトランザクションの送信と送信、およびトランザクション情報の補償プロセスです。全体のプロセスは次のとおりです:

通常のトランザクションの送信と送信段階

1. プロデューサは MQServer にハーフ メッセージを送信します (ハーフ メッセージとは、コンシューマが一時的に消費できないメッセージを指します)

2. サーバーはメッセージの書き込み結果に応答し、ハーフ メッセージが送信されます正常に完了しました

3. ローカルトランザクションの実行を開始します

4. ローカルトランザクションの実行状況に応じてCommitまたはRollbackを実行します

トランザクション情報の補償処理

1. MQServer が長期間受信しない場合、ローカル トランザクションの実行ステータスにより、プロデューサーへの確認レビュー操作リクエストが開始されます。

2. プロデューサが確認レビュー リクエストを受信した後、ローカル トランザクションの実行ステータスを確認します。

3. 「結果を確認した後、コミットまたはロールバック操作を実行します。」によると、

#補償フェーズは主にタイムアウトまたはロールバックの問題を解決するために使用されます。プロデューサがコミットまたはロールバック操作を送信すると失敗します。

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }
ログイン後にコピー

プロデューサーに対応するリスナー

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }
ログイン後にコピー

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>
ログイン後にコピー

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }
ログイン後にコピー

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }
ログイン後にコピー

Test

インターフェイスを呼び出した後のコンソール出力:

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法印刷ログから、コンシューマはすべての処理が完了した後でのみメッセージを受信することがわかります。メッセージが保存されました。

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法データを削除してから ID を 1 としてテストすると、エラーが発生します。

#データベースにデータがありません。 。 。 SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

非常に複雑ではありませんか? 2 段階で処理できます。

以上がSpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート