Maison >base de données >Redis >Explication détaillée de Redis et des files d'attente

Explication détaillée de Redis et des files d'attente

藏色散人
藏色散人avant
2020-08-26 11:51:022192parcourir

Ce qui suit est une explication détaillée de Redis et des files d'attente de la colonne Tutoriel Redis J'espère que cela sera utile aux amis dans le besoin !

Explication détaillée de Redis et des files d'attente

Résumé

Redis peut être utilisé non seulement comme serveur de cache, mais également comme file d'attente de messages. Son type de liste prend intrinsèquement en charge son utilisation comme file d'attente de messages. Comme le montre la figure ci-dessous :

Étant donné que la liste Redis est implémentée à l'aide d'une liste doublement chaînée, les nœuds de tête et de queue sont enregistrés, ce qui permet d'insérer des éléments des deux côtés du la liste est très rapide.

Implémentation de la file d'attente commune

Vous pouvez donc directement utiliser la liste de Redis pour implémenter la file d'attente de messages, avec seulement deux instructions simples : lpush et rpop ou rpush et lpop. Un exemple simple est le suivant :

Fin du message de stockage (producteur du message) :

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息生产者
 * @author yamikaze */public class Producer extends Thread { 
    public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count; 
    public Producer(String name) {        this.producerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    } 
    public int getCount() {        return count;
    }
 
    @Override    public void run() {        try {            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start(); 
        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

Fin du traitement du message (consommateur du message) :

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息消费者
 * @author yamikaze */public class Customer extends Thread{ 
    private String customerName;    private volatile int count;    private Jedis jedis; 
    public Customer(String name) {        this.customerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {
            count++;
            handle(message);
        }
    } 
    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }
 
    @Override    public void run() {        while (true) {
            processMessage();
        }
    } 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

Cela semble plutôt bien, mais dans Dans l'exemple ci-dessus, il y a un problème avec les consommateurs de messages, c'est-à-dire qu'ils doivent constamment appeler la méthode rpop pour vérifier s'il y a des messages en attente dans la liste. Chaque appel établira une connexion, ce qui entraînera un gaspillage inutile. Peut-être utiliserez-vous Thread.sleep() et d'autres méthodes pour laisser le thread consommateur consommer à nouveau après un certain temps, mais cela pose deux problèmes :

1) Si la vitesse du producteur est supérieure à celle du consommateur vitesse de consommation, la file d'attente des messages La longueur continuera d'augmenter et elle prendra beaucoup d'espace mémoire au fil du temps.

2) Si le temps de veille est trop long, certains messages urgents ne peuvent pas être traités. Si le temps de veille est trop court, cela entraînera également une surcharge relativement importante sur la connexion.

Vous pouvez donc utiliser l'instruction brpop. Cette instruction ne retournera que s'il y a un élément, elle bloquera jusqu'à l'expiration du délai et retournera null. Le consommateur peut donc modifier le message processMessage comme suit :

public void processMessage() {    /**
     * brpop支持多个列表(队列)
     * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。
     * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
     * 0表示不限制等待,会一直阻塞在这儿     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于该指令可以监听多个Key,所以返回的是一个列表        //列表由2项组成,1) 列表名,2)数据
        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}

Ensuite, vous pouvez exécuter Customer et effacer la console. Vous pouvez voir que le programme n'a aucune sortie et est bloqué ici dans brpop. Ensuite, ouvrez le client Redis et entrez dans la liste des clients de commande pour voir qu'il existe actuellement deux connexions.

File d'attente qui produit une fois et consomme plusieurs fois

En plus de prendre en charge les files d'attente de messages, Redis fournit également un ensemble de commandes pour prendre en charge le mode publication/abonnement. En utilisant le mode pub/sub de Redis, vous pouvez implémenter une file d'attente qui produit une fois et consomme plusieurs fois.

1) Publier
L'instruction PUBLISH peut être utilisée pour publier un message au format PUBLISH canal message

La valeur de retour indique le nombre d'abonnés au message.
2) Abonnement
L'instruction SUBSCRIBE permet de recevoir un message au format canal SUBSCRIBE

Vous pouvez voir que le mode abonnement a été entré après avoir utilisé l'instruction SUBSCRIBE, mais le message envoyé par publication n'a pas été reçu. En effet, les abonnements recevront le message uniquement avant son envoi. Pour les autres commandes de ce mode, seules les réponses sont visibles. Il existe trois types de réponses :
1. S'il s'agit d'un abonnement, la deuxième valeur indique la chaîne à laquelle vous êtes abonné, et la troisième valeur indique à quelle chaîne est abonnée ? (entendu comme un numéro de série ?)
2. Si est le message, la deuxième valeur est le canal qui a généré le message, et la troisième valeur est le message
3. S'il s'agit d'un désabonnement, la deuxième valeur représente le canal désabonné et la troisième valeur représente le nombre d'abonnements du client actuel .

Vous pouvez utiliser la commande UNSUBSCRIBE pour vous désabonner. Si aucun paramètre n'est ajouté, toutes les chaînes auxquelles vous êtes abonné par la commande SUBSCRIBE seront désabonnées.

Redis prend également en charge l'abonnement aux messages basé sur des caractères génériques, en utilisant la commande PSUBSCRIBE (modèle d'abonnement), par exemple :

Si vous essayez à nouveau de pousser le message, vous obtiendrez les résultats suivants :

Vous pouvez voir La commande de publication renvoie 2 et l'abonné a reçu le message deux fois. En effet, la commande PSUBSCRIBE peut s'abonner au canal à plusieurs reprises. Les chaînes souscrites à l'aide de la commande PSUBSCRIBE doivent également être désabonnées à l'aide de la commande PUNSUBSCRIBE. Cette commande ne peut pas se désinscrire des chaînes souscrites par SUBSCRIBE. De même, UNSUBSCRIBE ne peut pas se désinscrire des chaînes souscrites par la commande PSUBSCRIBE. Dans le même temps, le caractère générique de l’instruction PUNSUBSCRIBE ne sera pas étendu.
Par exemple : PUNSUBSCRIBE * ne correspondra pas à canal.*, donc pour vous désabonner de canal.*, vous devez écrire PUBSUBSCRIBE canal.* comme ceci.

L'exemple de code est le suivant :

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息发布方
 * @author yamikaze */public class Publisher { 
    public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis; 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    } 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}

Envoyez simplement un message.

Abonné au message :

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息订阅方客户端
 * @author yamikaze */public class SubscribeClient { 
    private Jedis jedis;    private static final String EXIT_COMMAND = "exit"; 
    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void subscribe(String ...channel) {        if(channel == null || channel.length <= 0) {            return;
        }        //消息处理,接收到消息时如何处理
        JedisPubSub jps = new JedisPubSub() {            /**
             * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
             * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
             * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
             * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]             */
            @Override            public void onMessage(String channel, String message) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }
 
                }
            } 
            /**
             * 订阅时             */
            @Override            public void onSubscribe(String channel, int subscribedChannels) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("订阅了频道:" + channel);
                }
            }
        };        //可以订阅多个频道 当前线程会阻塞在这儿        jedis.subscribe(jps, channel);
    } 
    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);        //并没有 unsubscribe方法        //相应的也没有punsubscribe方法    }
}

Exécutez d'abord le client, puis exécutez Publisher pour envoyer le message, le résultat de sortie :

Redis Pub/sub a aussi ses inconvénients, c'est que si le consommateur se déconnecte, les messages du producteur seront perdus.

File d'attente différée

Contexte

Dans le processus de développement commercial, certains scénarios nécessiteront un traitement retardé, tels que :

a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:

1.Java中java.util.concurrent.DelayQueue

优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化

2.Rocketmq延时队列

优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息

3.Rabbitmq延时队列(TTL+DLX实现)

优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

Redis实现的延时消息队列适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

Redis实现的延时消息队列思路

Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?

试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

Zset的排列效果如下图:

java代码实现如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
 * @program: test
 * @description: redis实现延时队列
 * @author: xingcheng
 * @create: 2018-08-19
 **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();
    }    /**
     * 生产者,生成5个订单     */
    public void productionDelayMessage() {        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();            // 3秒后执行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后执行");
        }
    }    //消费者,取订单
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {
                System.out.println("当前没有等待的任务");                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);
                }
            }
        }
    }    static class DelayMessage implements Runnable{
        @Override        public void run() {            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();        for (int i = 0; i < 10; i++) {            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

实现效果如下:

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer