RabbitMQ
を使用する場合、プロデューサーがメッセージを配信するときに、メッセージが対応するスイッチに正常に配信されたかどうかを知りたい場合、キューの場合、メッセージ配信の信頼性モードを制御するには 2 つの方法があります。
確認モード 。
戻りパターン 。
confirmCallback コールバックが返されます。確認ロジックは、
rabbitTemplate インスタンスに直接設定できます。
XML 構成を使用する場合は、工場出荷時の構成で
publisher-confirms="true" を有効にする必要があり、YAML 構成は直接
publisher- confirm-type : corrated、デフォルトは NONE で、手動でオンにする必要があります。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(); if (!b) { // 消息重发之类的处理 System.out.println(s); } else { System.out.println("交换机成功接收消息"); } } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
confirm 関数によって実行されます。この関数には 3 つのパラメータが含まれます。最初のパラメータは構成関連の情報で、2 番目のパラメータは切り替えが成功したかどうかを示します。メッセージは受信されました。3 番目のパラメーターは、メッセージが正常に受信されなかった理由を示します。
returnCallback が返されます。工場出荷時の設定でフォールバック モード
publisher-returns="true" を有効にし、スイッチがメッセージの処理に失敗するモードを設定し (デフォルトは false で、メッセージは直接破棄されます)、フォールバック処理ロジックを追加します。 。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重发逻辑处理 System.out.println(message.getBody() + " 投递消息队列失败"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
returnedMessage には 5 つのパラメータがあり、それぞれメッセージ オブジェクト、エラー コード、エラー メッセージ、スイッチ、ルーティング キーを参照します。
自動確認 : acknowledge="none"
:acknowledge="manual"
:acknowledge="auto "
手動確認方法を設定した場合は、通常のメッセージの消費後にコールバック確認
channel.basicAck() を実行し、手動で署名する必要があります。業務処理中に例外が発生した場合は、channel.basicNack()
を呼び出してメッセージを再送信します。 まず、キューをバインドするときに確認メカニズムを構成し、手動署名に設定する必要があります。
<!-- 绑定队列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
プロデューサ側を変更する必要はありません。メッセージに自動的に署名するようにコンシューマの実装を変更するだけで済みます。ビジネスが正常に実行されれば、メッセージは署名されます。ビジネスでエラーが発生した場合、メッセージは拒否され、メッセージは再送信または破棄されます。
public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息异常"); channel.basicNack(tag, true, true); e.printStackTrace(); } } }
これには 3 つの単純な署名関数が含まれます。1 つは正しい署名用です
basicAck、2 つ目は単一拒否用です basicReject
、3 つ目はバッチ拒否用ですベーシックナック
。
最初のパラメータは、現在のチャネルのみの、チャネル内のメッセージの一意の ID を示します。2 番目のパラメータは、次の場合にバッチで同意するかどうかを示します。 false です true の場合、現在の ID を持つメッセージに署名することにのみ同意し、メッセージ キューから削除します。true の場合、この ID より前のメッセージにも署名します。
最初のパラメータは引き続きメッセージの一意の ID を示し、2 番目のパラメータはメッセージを再度キューに入れて送信するかどうかを示し、false はメッセージが直接破棄されることを示します。または、デッド レター キューを受信できます。true の場合、メッセージ送信のためにキューに戻ることを意味します。すべての操作は現在のメッセージに対してのみ行われます。
には 2 番目のパラメーターよりも 1 つ多くのパラメーターがあります。これは中央のブール値で、バッチ処理を実行するかどうかを示します。
在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。
只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。
<!-- 绑定队列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。
队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。
如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置 message 的过期时间 message.getMessageProperties().setExpiration("5000"); // 返回该消息 return message; } }; rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。
死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。
消息成为死信消息有几种情况
队列的消息长度达到限制
消费者拒接消息的时候不把消息重新放入队列中
队列存在消息过期设置,消息超时未被消费
消息存在过期时间,在投递给消费者时发现过期
在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交换机 --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 队列过期时间 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 队列长度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments> </rabbit:queue>
延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。
消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL
+ 死信队列
的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。
再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。
例如实现一个下单超时支付取消订单的功能:
以上がJava の RabbitMQ 高度なアプリケーション メソッドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。