Maison > Java > javaDidacticiel > Méthodes d'implémentation de persistance et de confirmation de publication dans Java RabbitMQ

Méthodes d'implémentation de persistance et de confirmation de publication dans Java RabbitMQ

王林
Libérer: 2023-04-25 16:19:08
avant
1349 Les gens l'ont consulté

    1. Persistance

    Lorsque le service RabbitMQ est arrêté, les messages envoyés par le producteur de messages ne seront pas perdus. Par défaut, les files d'attente et les messages sont ignorés lorsque RabbitMQ se ferme ou plante. Afin de garantir que les messages ne soient pas perdus, la file d'attente et le message doivent être marqués comme persistants.

    1.1 Implémenter la persistance

    1. channel.queueDeclare();第二个参数改为true。

    2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

    /**
     * @Description 持久化MQ
     * @date 2022/3/7 9:14
     */
    public class Producer3 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 持久化队列
            channel.queueDeclare(LONG_QUEUE,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            int i = 0;
            while (scanner.hasNext()){
                i++;
                String msg = scanner.next() + i;
                // 持久化消息
                channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送消息:'" + msg + "'成功");
            }
        }
    }
    Copier après la connexion

    但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

    1.2 不公平分发

    轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

    在消费者处修改channel.basicQos(1);表示开启不公平分发

    /**
     * @Description 不公平分发消费者
     * @date 2022/3/7 9:27
     */
    public class Consumer2 {
        private static final String LONG_QUEUE = "long_queue";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 模拟并发沉睡三十秒
                try {
                    Thread.sleep(30000);
                    System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            // 设置不公平分发
            channel.basicQos(1);
            channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                    consumerTag -> {
                        System.out.println(consumerTag + "消费者取消消费");
                    });
        }
    }
    Copier après la connexion

    1.3 测试不公平分发

    测试目的:是否能实现能者多劳。

    测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

    先启动生产者创建队列,再分别启动两个消费者。

    生产者按照顺序发四条消息:

    Méthodes dimplémentation de persistance et de confirmation de publication dans Java RabbitMQ

    睡眠时间短的线程A接收到了三条消息

    Méthodes dimplémentation de persistance et de confirmation de publication dans Java RabbitMQ

    而睡眠时间长的线程B只接收到的第二条消息:

    Méthodes dimplémentation de persistance et de confirmation de publication dans Java RabbitMQ

    因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

    实验成功!

    1.4 预取值

    消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

    这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

    1.4.1 代码测试

    测试方法:

    1.新建两个不同的消费者分别给定预期值5个2。

    2.给睡眠时间长的指定为5,时间短的指定为2。

    3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

    代码根据上述代码修改预期值即可。

    2. 发布确认

    发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

    需要注意的是需要开启队列持久化才能使用确认发布。
    开启方法:channel.confirmSelect();

    2.1 单个确认发布

    是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

    /**
     * @Description 确认发布——单个确认
     * @date 2022/3/7 14:49
     */
    public class SoloProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_solo";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 单个发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("发送消息:" + i);
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
    }
    Copier après la connexion

    2.2 批量确认发布

    一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

    /**
     * @Description 确认发布——批量确认
     * @date 2022/3/7 14:49
     */
    public class BatchProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_batch";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 设置一个多少一批确认一次。
            int batchSize = MESSAGE_COUNT / 10;
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = ""+i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 批量发布确认
                if (i % batchSize == 0){
                    if (channel.waitForConfirms()){
                        System.out.println("发送消息:" + i);
                    }
                }
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    Copier après la connexion

    显然效率要比单个确认发布的高很多。

    2.3 异步确认发布

    在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

    /**
     * @Description 确认发布——异步确认
     * @date 2022/3/7 14:49
     */
    public class AsyncProducer {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                System.out.println("未确认的消息:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    Copier après la connexion

    2.4 处理未确认的消息

    最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

    例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks

    2. Persistance du message : lorsque vous utilisez un canal pour envoyer un message, channel.basicPublish(); remplacez le troisième paramètre par : MessageProperties.PERSISTENT_TEXT_PLAIN pour indiquer les messages persistants.

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    Copier après la connexion
    Copier après la connexion
    Cependant, il existe un intervalle de cache pour stocker les messages. Il n'y a pas d'écriture réelle sur le disque. La garantie de durabilité n'est pas assez forte, mais elle est plus que suffisante pour une simple file d'attente.

    1.2 Distribution injuste

    La méthode de distribution des sondages n'est pas adaptée lorsque les consommateurs ont des efficacités de traitement différentes. Par conséquent, la véritable équité devrait partir du principe selon lequel ceux qui peuvent faire plus de travail devraient le faire.

    Modifiez channel.basicQos(1); du côté du consommateur pour permettre une distribution déloyale🎜
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    Copier après la connexion
    Copier après la connexion
    🎜1.3 Tester la distribution déloyale🎜🎜🎜Objectif du test🎜 : Si cela peut être atteint par ceux qui peuvent le faire plus de travail. 🎜🎜🎜Méthode de test🎜 : Deux consommateurs mettent en veille différents événements pour simuler différents événements de traitement. Si le temps de traitement (temps de veille) est court et peut traiter plusieurs messages, cela signifie que l'objectif est atteint. 🎜🎜Démarrez d'abord le producteur pour créer la file d'attente, puis démarrez respectivement les deux consommateurs. 🎜🎜🎜Le producteur envoie quatre messages dans l'ordre : 🎜🎜🎜Comment implémenter Java RabbitMQ Persistance et confirmation de publication🎜🎜Le fil de discussion A avec un court temps de veille a reçu trois messages🎜🎜Comment Java RabbitMQ implémente la persistance et la confirmation de publication🎜🎜Le thread B, qui dort depuis longtemps, n'a reçu que le deuxième message : 🎜🎜Comment Java RabbitMQ implémente la persistance et la confirmation de publication🎜🎜Étant donné que le thread B met beaucoup de temps à traiter les messages, d'autres messages sont attribués au thread A. 🎜🎜Expérience réussie ! 🎜🎜1.4 Valeur de prélecture🎜🎜🎜L'envoi et la confirmation manuelle des messages sont effectués de manière asynchrone, il existe donc un tampon de messages non confirmés. Les développeurs espèrent limiter la taille du tampon pour éviter les problèmes de tampons illimités. 🎜🎜🎜La valeur attendue ici correspond aux paramètres de la méthode ci-dessus channel.basicQos(); S'il y a des messages égaux aux paramètres sur le canal actuel, le canal actuel ne sera pas organisé pour consommer. messages. 🎜
    1.4.1 Test de code
    🎜🎜Méthode de test : 🎜🎜🎜1 Créez deux consommateurs différents et donnez la valeur attendue de 5 et 2 respectivement. 🎜🎜2. Désignez 5 pour une durée de sommeil longue et 2 pour une durée de sommeil courte. 🎜🎜3. Si le message est obtenu selon la valeur attendue spécifiée, cela signifie que le test est réussi, mais cela ne signifie pas qu'il sera distribué selon 5 et 2. Ceci est similaire au jugement de poids. 🎜🎜Le code peut modifier la valeur attendue selon le code ci-dessus. 🎜🎜2. Confirmation de libération 🎜🎜🎜 La confirmation de libération est le processus dans lequel, après que le producteur a publié le message dans la file d'attente, la confirmation de la file d'attente est conservée puis notifiée au producteur. Cela garantit que le message ne sera pas perdu. 🎜🎜🎜Il convient de noter que la persistance de la file d'attente doit être activée pour utiliser la publication confirmée.
    Méthode d'ouverture : channel.confirmSelect();🎜🎜2.1 La publication de confirmation unique🎜🎜🎜 est une méthode de publication synchrone, c'est-à-dire après l'envoi d'un message, seulement après sa confirmation et sa publication. , Les messages suivants continueront d'être publiés et une exception sera levée s'il n'y a pas de confirmation dans le délai spécifié. L’inconvénient est qu’il est extrêmement lent. 🎜🎜rrreee🎜2.2 Libération de confirmation par lots🎜🎜🎜La libération par lots de confirmation peut améliorer le débit du système. Cependant, l'inconvénient est qu'en cas d'échec et de problème de publication, l'intégralité du lot doit être enregistrée en mémoire et republiée ultérieurement. 🎜🎜rrreee🎜Évidemment, l'efficacité est bien supérieure à celle d'une seule version de confirmation. 🎜🎜2.3 La version de confirmation asynchrone🎜🎜🎜 est plus complexe en programmation que les deux ci-dessus, mais elle est très rentable, qu'elle soit fiable ou efficace, elle utilise des fonctions de rappel pour obtenir une livraison fiable des messages. 🎜🎜rrreee🎜2.4 Gestion des messages non confirmés 🎜🎜🎜La meilleure façon de gérer les messages non confirmés est de placer les messages non confirmés dans une file d'attente basée sur la mémoire accessible par le fil de publication. 🎜🎜🎜Par exemple : ConcurrentLinkedQueue peut transférer des messages entre la file d'attente de confirmation confirm callbacks et le fil de publication. 🎜🎜🎜Méthode de traitement : 🎜🎜🎜1. Enregistrez tous les messages à envoyer ; 🎜🎜2. Supprimez lors de la confirmation réussie de la publication ; 🎜🎜3. 🎜🎜Utiliser une table de hachage pour stocker les messages, ses avantages : 🎜

    可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    Copier après la connexion
    Copier après la connexion
    /**
     * @Description 异步发布确认,处理未发布成功的消息
     * @date 2022/3/7 18:09
     */
    public class AsyncProducerRemember {
        private static final int MESSAGE_COUNT = 100;
        private static final String QUEUE_NAME = "confirm_async_remember";
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 产生队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            // 开启确认发布
            channel.confirmSelect();
            // 线程安全有序的一个hash表,适用与高并发
            ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
            // 记录开始时间
            long beginTime = System.currentTimeMillis();
            // 确认成功回调
            ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
                //2. 在发布成功确认处删除;
                // 批量删除
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                    confirmMap.clear();
                }else {
                    // 单独删除
                    map.remove(deliveryTab);
                }
                System.out.println("确认成功消息:" + deliveryTab);
            };
            // 确认失败回调
            ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
                // 3. 打印未确认的消息。
                System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
            };
            // 消息监听器
            /**
             * addConfirmListener:
             *                  1. 确认成功的消息;
             *                  2. 确认失败的消息。
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "" + i;
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                // 1. 记录要发送的全部消息;
                map.put(channel.getNextPublishSeqNo(),msg);
            }
    
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
        }
    }
    Copier après la connexion
    Copier après la connexion

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal