首頁 > 資料庫 > Redis > redis怎麼實現隊列的阻塞、延時、發布和訂閱

redis怎麼實現隊列的阻塞、延時、發布和訂閱

WBOY
發布: 2022-05-23 20:49:33
轉載
3446 人瀏覽過

這篇文章為大家帶來了關於Redis的相關知識,其中主要介紹了關於怎麼實現隊列的阻塞、延時、發布和訂閱的相關問題,下面一起來看一下,希望對大家有幫助。

redis怎麼實現隊列的阻塞、延時、發布和訂閱

推薦學習:Redis視訊教學

#Redis不僅可作為快取伺服器,也可以用作訊息佇列。它的列表類型天生支援用作訊息隊列。如下圖所示:
redis怎麼實現隊列的阻塞、延時、發布和訂閱

由於Redis的列表是使用雙向鍊錶實現的,保存了頭節點和尾節點,所以在列表的頭部和尾部兩邊插入或獲取元素都是非常快的,時間複雜度為O(1)。

普通佇列

可以直接使用Redis的list資料類型實作訊息佇列,只需簡單的兩個指令lpush和rpop或rpush和lpop。

  • lpush rpop:左進右出的佇列
  • rpush lpop:左出右進的佇列

下面使用redis的指令來模擬普通隊列。
使用lpush指令生產訊息:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
登入後複製

使用rpop指令消費訊息:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
登入後複製

下面使用Java程式碼來實作普通佇列。

生產者SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>消費者SingleConsumer:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}
登入後複製

上面的程式碼已經基本實現了普通隊列的生產與消費,但是上述的例子中訊息的消費者有兩個問題:

  1. 消費者需要不停的呼叫rpop方法來查看redis的list中是否有待處理的資料(訊息)。每呼叫一次都會發起一次連接,有可能list中沒有數據,造成大量的空輪詢,導致不必要的浪費。也許你可以使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,如果睡眠時間過長,這樣不能處理一些時效性要求高的消息,睡眠時間過短,也會在連接上造成比較大的開銷。
  2. 如果生產者速度大於消費者消費速度,訊息佇列長度會一直增大,時間久了會佔用大量記憶體空間。

阻塞隊列

消費者可以使用brpop指令從redis的list中獲取數據,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端就不需要休眠後取得資料了,這樣就等於實作了一個阻塞佇列,

使用redis的brpop指令來模擬阻塞佇列。

>brpop queue:single 30
登入後複製

可以看到命令列阻塞在了brpop這裡了,30s後沒資料就回傳。

Java程式碼實作如下:

生產者與普通佇列的生產者一致。

消費者BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>
登入後複製

缺點:無法實現一次生產多次消費。

發布訂閱模式

Redis除了對訊息佇列提供支援外,還提供了一組命令用於支援發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

發佈:PUBLISH指令可用於發布一則訊息,格式:

PUBLISH channel message
登入後複製

傳回值表示訂閱了該訊息的數量。

訂閱:SUBSCRIBE指令用於接收一條訊息,格式:

SUBSCRIBE channel
登入後複製

使用SUBSCRIBE指令後進入了訂閱模式,但是不會接收到訂閱之前publish發送的訊息,這是因為只有在訊息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回應。

回覆分為三種:

  1. 如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是已訂閱的頻道的數量
  2. 如果為message(訊息),第二個值為產生該訊息的頻道,第三個值為訊息
  3. 如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示目前客戶端的訂閱數量。

下面使用redis的指令來模擬發布訂閱模式。

生產者:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
登入後複製

消費者:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
登入後複製

Java程式碼實作如下:

生產者PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i <p>消費者PubsubConsumer:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}
登入後複製

消費者可以啟動多個,每個消費者都能收到所有的訊息。

可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會取消所有由SUBSCRIBE指令訂閱的頻道。

Redis也支援基於通配符的訊息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*
登入後複製

用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,指令無法指令退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也無法退訂PSUBSCRIBE指令訂閱的頻道。

同時PUNSUBSCRIBE指令通配符不會展開。例如:PUNSUBSCRIBE \*不會符合到channel.\*,所以要取消訂閱channel.\*就要這樣寫PUBSUBSCRIBE channel. \*

Redis的pub/sub也有其缺點,那就是如果消費者下線,生產者的消息會遺失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
登入後複製

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
登入後複製

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2>应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="//m.sbmmt.com/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>
登入後複製

以上是redis怎麼實現隊列的阻塞、延時、發布和訂閱的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:csdn.net
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板