Home > Database > Redis > body text

How redis implements queue blocking, delay, publishing and subscription

WBOY
Release: 2022-05-23 20:49:33
forward
3351 people have browsed it

This article brings you relevant knowledge about Redis, which mainly introduces related issues about how to implement queue blocking, delay, publishing and subscription. Let's take a look at it together. I hope everyone has to help.

How redis implements queue blocking, delay, publishing and subscription

Recommended learning: Redis video tutorial

Redis can not only be used as a cache server, but also as a message queue. Its list type inherently supports use as a message queue. As shown in the figure below:
How redis implements queue blocking, delay, publishing and subscription

Since the Redis list is implemented using a doubly linked list, the head node and the tail node are saved, so inserting or obtaining elements on both sides of the head and tail of the list is It is very fast and the time complexity is O(1).

Ordinary Queue

You can directly use the Redis list data type to implement the message queue, with just two simple instructions: lpush and rpop or rpush and lpop.

  • lpush rpop: left-in and right-out queue
  • rpush lpop: left-out and right-in queue

The following uses the redis command to simulate ordinary queue.
Use the lpush command to produce messages:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
Copy after login

Use the rpop command to consume messages:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
Copy after login

The following uses Java code to implement a common queue.

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer SingleConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}
Copy after login

The above code has basically realized the production and consumption of ordinary queues, but the consumer of the message in the above example There are two problems:

  1. Consumers need to constantly call the rpop method to check whether there is data (message) to be processed in the redis list. A connection will be initiated every time it is called. There may be no data in the list, resulting in a large number of empty polls and unnecessary waste. Maybe you can use Thread.sleep() and other methods to allow the consumer thread to consume again after a period of time. If the sleep time is too long, some time-sensitive messages cannot be processed. If the sleep time is too short, it will also cause comparisons on the connection. Big overhead.
  2. If the producer speed is greater than the consumer consumption speed, the length of the message queue will continue to increase, which will occupy a lot of memory space over time.

Blocking queue

Consumers can use the brpop instruction to obtain data from the redis list. This instruction will only return if there are elements. If not, it will block until timeout and return null. Therefore, the consumer does not need to sleep to obtain data, which is equivalent to implementing a blocking queue.

Use the brpop command of redis to simulate the blocking queue.

>brpop queue:single 30
Copy after login

You can see that the command line is blocked here in brpop, and will return after 30 seconds without data.

The Java code is implemented as follows:

The producer is the same as the producer of the ordinary queue.

Consumer BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>
Copy after login

Disadvantages: One production and multiple consumption cannot be achieved.

Publish and subscribe mode

In addition to providing support for message queues, Redis also provides a set of commands to support the publish/subscribe mode. Using the pub/sub mode of Redis, you can implement a queue that produces once and consumes multiple times.

Publish: The PUBLISH instruction can be used to publish a message, format:

PUBLISH channel message
Copy after login

The return value indicates the number of subscribers to the message.

Subscription: The SUBSCRIBE instruction is used to receive a message, format:

SUBSCRIBE channel
Copy after login

After using the SUBSCRIBE instruction, you enter the subscription mode, but you will not receive the message sent by publish before subscribing. This is because only Subscriptions will not receive the message until it is sent. For other commands in this mode, only replies can be seen.

Replies are divided into three types:

  1. If it is subscribe, the second value indicates the subscribed channel, and the third value indicates the number of subscribed channels
  2. If it is message (message), the second value is the channel that generated the message, and the third value is the message.
  3. If it is unsubscribe, the second value represents the channel to unsubscribe from, and the third value is the channel that generated the message. The value represents the current client's subscription number.

The following uses the redis command to simulate the publish-subscribe mode.

Producer:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
Copy after login

Consumer:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
Copy after login

The Java code is implemented as follows:

Producer PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>Consumer PubsubConsumer: </p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}
Copy after login

Consumers can start multiple consumers, and each consumer can receive all messages.

You can use the command UNSUBSCRIBE to unsubscribe. If no parameters are added, all channels subscribed to by the SUBSCRIBE command will be unsubscribed.

Redis also supports message subscription based on wildcards, using the command PSUBSCRIBE (pattern subscribe), for example:

psubscribe channel.*
Copy after login

Channels subscribed with the PSUBSCRIBE command must also use the command PUNSUBSCRIBE command to unsubscribe. This command cannot Unsubscribe from the channel subscribed by SUBSCRIBE. Similarly, UNSUBSCRIBE cannot unsubscribe from the channel subscribed by PSUBSCRIBE command.

At the same time, the PUNSUBSCRIBE instruction wildcard will not be expanded. For example: PUNSUBSCRIBE \* will not match channel.\*, so to unsubscribe from channel.\*, you must write PUBSUBSCRIBE channel. \*.

Redis' pub/sub also has its shortcomings, that is, if the consumer goes offline, the producer's messages will be lost.

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
Copy after login

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
Copy after login

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="//m.sbmmt.com/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>
Copy after login

The above is the detailed content of How redis implements queue blocking, delay, publishing and subscription. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:csdn.net
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!