
Delay queue, as the name suggests, is a message queue with delay function. So, under what circumstances do I need such a queue?
1. Background
Let’s first look at the following business scenario:
- When the order has been unpaid How to close the order in a timely manner when the order is in the refund status
- How to regularly check whether the order in the refund status has been successfully refunded
- When the order does not receive status notification from the downstream system for a long time, how to Strategies to achieve stepped synchronization of order status
- When the system notifies the upstream system of the final status of successful payment, the upstream system returns a notification failure. How to perform asynchronous notification and send it at a divided frequency: 15s 3m 10m 30m 30m 1h 2h 6h 15h
1.1 Solution
The simplest way is to scan the meter regularly . For example, if the order payment expiration requirements are relatively high, the meter will be scanned every 2 seconds to check expired orders and actively close the orders. The advantage is that it is simple, The disadvantage is that it scans the table globally every minute, which wastes resources. If the order volume of the table data is about to expire is large, it will cause a delay in order closing.
Use RabbitMq or other MQ modifications to implement delay queues. The advantages are that it is open source and a ready-made and stable implementation solution. The disadvantages are: MQ is a message middleware. If the team technology stack is inherently If you have MQ, that's fine. If not, then it's a bit expensive to deploy a set of MQ to delay the queue.
Using Redis's zset and list features, we can use redis to implement it A delay queueRedisDelayQueue
2. Design goal
- Real-time performance: Second-level errors are allowed for a certain period of time
- High availability: supports stand-alone, supports clusters
- Supports message deletion: the business will delete specified messages at any time
- Message reliability: guaranteed to be at least Consumed once
- Message persistence: Based on the persistence characteristics of Redis itself, if Redis data is lost, it means the loss of delayed messages, but primary backup and cluster guarantees can be provided. This can be considered for subsequent optimization to persist the message into MangoDB
3. Design plan
The design mainly includes the following points:
- Treat the entire Redis as a message pool and store messages in KV format
- Use ZSET as the priority queue and maintain the priority according to Score
- Use the LIST structure to advance First-out consumption
- ZSET and LIST store message addresses (corresponding to each KEY in the message pool)
- Customize the routing object, store ZSET and LIST names, and send messages from ZSET routes to the correct LIST
- Use timer to maintain routing
- Implement message delay according to TTL rules
3.1 Design diagram
It is still based on Youzan’s delay queue design, optimization and code implementation. Youzan Design
##3.2 Data Structure
- ZING:DELAY_QUEUE:JOB_POOL
It is a Hash_Table structure that stores all delay queue information. KV structure: K=prefix projectName field = topic jobId V=CONENT;VThe data passed in by the client will be returned when consuming - ZING:DELAY_QUEUE:BUCKET
There are delay queues The sequence set ZSET stores K=ID and the required execution timestamp, sorted according to the timestamp - ZING:DELAY_QUEUE:QUEUE
LIST structure, each Topic has a LIST, and the list stores The JOB
picture that currently needs to be consumed is for reference only. It can basically describe the execution of the entire process. The picture comes from the reference blog at the end of the article
3.3 Task life cycle
- When a new JOB is added, a piece of data will be inserted into
- ZING:DELAY_QUEUE:JOB_POOL
and recorded Business side and consumer side.ZING:DELAY_QUEUE:BUCKETwill also insert a record to record the execution timestampThe handling thread will go to - ZING:DELAY_QUEUE:BUCKET
to find which execution timestamps RunTimeMillis is smaller than the current time, delete all these records; at the same time, it will parse what the Topic of each task is, and then push these tasks to the list corresponding to TOPICZING:DELAY_QUEUE:QUEUE中Each TOPIC LIST will have a listening thread to batch obtain the data to be consumed in the LIST, and all the acquired data will be thrown to the consumption thread pool of this TOPIC - The execution of the consumption thread pool will go
- ZING:DELAY_QUEUE:JOB_POOL
Find the data structure, return it to the callback structure, and execute the callback method.
3.4 Design Points
3.4.1 Basic concept
- JOB: Tasks that require asynchronous processing are the basic units in the delay queue
- Topic: a collection (queue) of jobs of the same type. For consumers to subscribe
3.4.2 Message structure
Each JOB must contain the following attributes
- jobId: The unique identifier of the Job. Used to retrieve and delete specified Job information
- topic: Job type. It can be understood as a specific business name
- delay: the time the job needs to be delayed. Unit: seconds. (The server will convert it into an absolute time)
- body: The content of the Job, for consumers to do specific business processing, stored in json format
- retry: Number of failed retries
- url: notification URL
3.5 Design details
3.5.1 How to consume quickly ZING:DELAY_QUEUE:QUEUE
The simplest implementation method is to use a timer to perform second-level scanning, in order to ensure the timeliness of message execution , you can set it to request Redis every 1S to determine whether there are JOBs to be consumed in the queue. But there will be a problem. If there are no consumable JOBs in the queue, then frequent scanning will be meaningless and a waste of resources. Fortunately, there is a BLPOP blocking primitive in the LIST. If the list If there is data, it will be returned immediately. If there is no data, it will be blocked there until data is returned. You can set the blocking timeout, and NULL will be returned after the timeout. The specific implementation methods and strategies will be introduced in the code.
3.5.2 Avoid repeated transfer and consumption of messages caused by timing
- Use Redis's distributed lock to control the transfer of messages. In order to avoid problems caused by repeated transfer of messages
- Use distributed locks to ensure the execution frequency of the timer
4. Core code implementation
4.1 Technical Description
Technology stack: SpringBoot, Redisson, Redis, distributed lock, timer
Note: This project does not realize the multiple Queue consumption in the design plan, and only opens one QUEUE. This will be optimized in the future
4.2 Core Entity
4.2.1 Add new objects to Job
/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
/**
* Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
*/
private Long delay;
/**
* Job的内容,供消费者做具体的业务处理,以json格式存储
*/
@NotBlank
private String body;
/**
* 失败重试次数
*/
private int retry = 0;
/**
* 通知URL
*/
@NotBlank
private String url;
}
4.2.2 Delete objects from Job
/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class JobDie implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
}
4.3 Transport thread
/**
* 搬运线程
*
* @author 睁眼看世界
* @date 2020年1月17日
*/
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* 启动定时开启搬运JOB信息
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
readyQueue.addAll(jobList);
bucketSet.removeAllAsync(jobList);
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</string></string></object></object>
4.4 Consumer thread
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC消费线程
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
/**
* 开启TOPIC消费线程
* 将所有可能出现的异常全部catch住,确保While(true)能够不中断
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. 获取ReadyQueue中待消费的数据
RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. 获取job元信息内容
RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. 消费
FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 消费成功,删除JobPool和DelayBucket的job信息
jobPoolMap.remove(topicId);
} else {
int retrySum = job.getRetry() + 1;
// 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 更新元信息失败次数
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
}</object></boolean></string></string>
4.5 Adding and deleting JOB
/**
* 提供给外部服务的操作接口
*
* @author why
* @date 2020年1月15日
*/
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 添加job元信息
*
* @param job 元信息
*/
@Override
public void addJob(Job job) {
RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
}
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 删除job信息
*
* @param job 元信息
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</object></string></object></string>
5. Content to be optimized
- Currently there is only one Queue queue to store messages. When a large number of messages that need to be consumed accumulate, the timeliness of message notifications will be affected. The improvement method is to open multiple Queues, perform message routing, and then open multiple consumer threads for consumption to provide throughput.
- The messages are not persisted, which is risky. The messages will be persisted to MangoDB in the future.
6. Source code
Please get more detailed source code at the address below
-
RedisDelayQueue implementationzing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue) -
RedissonStarterredisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter) -
project applicationzing-pay(https://gitee.com/whyCodeData/zing-pay)
##7. Reference
- https://tech.youzan.com/queuing_delay/
- https://blog.csdn.net/u010634066/article/details/98864764
redis introductory tutorial column.
The above is the detailed content of How does Redis implement delay queue? Method introduction. For more information, please follow other related articles on the PHP Chinese website!
How to read an element by its index using LINDEX?Jul 23, 2025 am 01:20 AMLINDEX is a command in Redis to get the element of the specified index position in a list. Its syntax is LINDEXkeyindex, which supports positive and negative indexes. Positive numbers start from the head, 0 represents the first element; negative numbers are counted from the tail, and -1 is the last element. This command is suitable for scenes where you only need to get a single element and is more efficient than LRANGE. Pay attention to when using: 1. Make sure that the index is within the list length range, otherwise nil will be returned; 2. The list length can be obtained through LLEN to verify the index legitimacy; 3. Support negative indexes to facilitate access to the end elements; 4. Avoid frequent use of large lists, because their time complexity is O(N) may affect performance.
What happens to a message if there are no subscribers?Jul 23, 2025 am 01:16 AMIfamessageispublishedtoatopicorchannelwithnosubscribers,ittypicallygetslostunlessspecificmechanismsareinplace.1.InRabbitMQ,messagesmaystayinaqueueuntilaconsumerconnectsifnoconsumerisbound.2.InPub/SubsystemslikeGoogleCloudPub/Sub,messagesareusuallydis
What is the difference between an in-memory database and a disk-based database?Jul 23, 2025 am 12:16 AM1. The memory database stores data in RAM, which is suitable for scenarios that require ultra-low latency, but is easily lost after power failure; 2. The disk database stores data on a hard disk or SSD, which has data durability and is suitable for applications that cannot tolerate data loss; 3. The memory database is fast and suitable for real-time analysis, high-frequency trading and other scenarios, while the disk database is suitable for large-scale data and long-term storage; 4. The memory database requires additional measures to ensure durability, and the cost is high. Choice should be determined based on speed, reliability and cost requirements.
How to retrieve a range of elements from a list using LRANGE?Jul 23, 2025 am 12:01 AMLRANGE is used to take out elements of the specified range from the Redis list, supporting positive and negative indexes; 1. Use 0 to -1 for the entire list; 2. Use 0 to N-1 for the first N; 3. Use -N to -1 for the last N; 4. Use -N to -1 for the page; 4. Use paging to control by start and stop; note that starting is greater than the length or stop exceeds the end, will return empty or valid part, and start>stop also returns empty, which is suitable for cache, log, queue and other scenarios.
What are 'slowlog' commands and how do you configure them?Jul 22, 2025 am 12:36 AMRedisslowlog is a log system that records commands that take too long to execute, and is used to identify performance issues. 1. It records each command that exceeds the specified execution time, including log ID, timestamp, execution time, commands and parameters; 2. Use redis-clislowlogget to view the log, and the 10 slowest commands are returned by default, and the number can be specified by parameters; 3. Use slowlog-log-slower-than to configure the threshold, which is 10 milliseconds by default, -1 means recording all commands, and 0 means disable; 4. The maximum number of log entries is controlled by slowlog-max-len, and the default is 128, which can be adjusted but will occupy memory; 5. It is often used to troubleshoot slowdowns in applications, impacts and characteristics of new functions.
How to set multiple key-value pairs in one command using MSET?Jul 22, 2025 am 12:22 AMRedis's MSET command allows multiple key-value pairs to be set in one operation. The basic syntax is MSETkey1value1key2value2...keyNvalueN, for example, MSETusernamejohn_doeemailjohn@example.comstatusactive can store multiple user information at once. Using MSET has the following advantages over using SET commands multiple times: 1. Improve efficiency and reduce network round trips; 2. Ensure the atomicity of the operation (all success or failure); 3. Make the code more concise and easy to maintain. But two points should be noted: 1. MSET will overwrite existing keys, which may lead to data loss; 2. The command does not provide details
What is Redis Cluster and how does it provide horizontal scaling?Jul 22, 2025 am 12:16 AMRedisCluster achieves horizontal expansion through data sharding, dividing the key space into 16,384 hash slots, each node is responsible for a portion of the slots. 1. Automatic data sharding: Use the CRC16 algorithm to map keys to specific slots to avoid single-point bottlenecks; 2. Distributed architecture: No central coordinator, communication between nodes through gossip protocol, supporting master-slave replication to ensure high availability; 3. Automatic rebalancing: Automatically reassign slots when adding and deleting nodes; 4. Client redirection: The client connects any node and is directed to the correct node. Deployment requires at least three master nodes, use the redis-cli command to create a cluster and configure a client driver that supports the cluster. Common problems include multi-key operations that need to coexist, network partitioning may cause brain splits,
What is the use case for the blocking version BRPOPLPUSH?Jul 22, 2025 am 12:05 AMBRPOPLPUSH is suitable for blocking task queues, atomic data transfers and simulated delay queue scenarios. 1. Implement a task queue with blocking: In the producer-consumer model, this command allows consumers to automatically block and wait when there is no task, avoiding wasting resources; 2. Move elements atomically and retain backups: Ensure that the process of taking out elements from one list and inserting into another list is uninterrupted, suitable for scenarios where task processing fails to be retryed or analyzed; 3. Used to implement circular queues or delay queue logic: By combining additional judgment logic, lightweight delay queues can be simulated, and the task can be determined according to conditions.


Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Safe Exam Browser
Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.

WebStorm Mac version
Useful JavaScript development tools

Zend Studio 13.0.1
Powerful PHP integrated development environment

SublimeText3 Chinese version
Chinese version, very easy to use

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.







