Home >headlines >PHP implements lightweight delay queue (multi-threading) based on Redis

PHP implements lightweight delay queue (multi-threading) based on Redis

爱喝马黛茶的安东尼
爱喝马黛茶的安东尼forward
2020-01-17 17:19:417845browse

PHP implements lightweight delay queue (multi-threading) based on Redis

Delay queue, as the name suggests, is a message queue with delay function. So, under what circumstances do I need such a queue?

1. Background

Let’s take a look at the business scenario:

1. Send a recall notice 3 days before the membership expires

2. After the order payment is successful, check whether the downstream links are normal after 5 minutes. For example, after the user purchases a membership, whether the various membership statuses are set successfully

3. How to regularly check whether the order in refund status has been Refund successfully?

4. If the notification fails, the notification will be repeated in 1, 3, 5, and 7 minutes until the other party replies?

Usually the simplest and most direct way to solve the above problems is to scan the meter regularly.

The problems with table scanning are:

1. The table scanning is connected to the database for a long time. When the number is large, the connection is prone to abnormal interruption, which requires more exception handling and the program High robustness requirements

2. When the amount of data is large, the delay is high and the processing cannot be completed within the regulations, which affects the business. Although multiple processes can be started for processing, this will bring additional maintenance costs. , cannot be fundamentally solved.

3. Each business must maintain its own table scanning logic. When the business increases, it is found that the logic of the table scanning part will be developed repeatedly, but it is very similar

The delay queue can solve the above needs very well

2. Research

Researched some open source solutions on the market, as follows:

1. Youzan Technology: only principles, no open source code

2.Github personal: https://github.com/ouqiang/delay-queue

(1) Based on redis implementation, only one redis can be configured. If redis hangs, the entire service will be unavailable, and the availability will be almost

( 2) The consumer side implements the pull model, and the access cost is high. Each project must implement the access code

(3) There are not many people using it in star, and there are risks in putting it in the production environment. , coupled with the lack of understanding of the Go language, it is difficult to maintain if there is a problem

3.SchedulerX-Alibaba open source: Very powerful, but complex in operation and maintenance, relying on many components, and not lightweight enough

4 .RabbitMQ-delayed task: It does not have a delay function itself. It needs to be implemented by itself with the help of a feature. Moreover, the company has not deployed this queue. It is a bit expensive to deploy one separately to make a delay queue, and it also requires special operation and maintenance to maintain it. , currently the team does not support

Basically, for the above reasons, I plan to write one myself. I usually use PHP. The project basically uses the zset structure of redis as storage, which is implemented in PHP language. For the implementation principle, please refer to the Youzan team: https:// tech.youzan.com/queuing_delay/

The entire delay queue is mainly composed of 4 parts

JobPool is used to store the meta information of all jobs.

DelayBucket is a set of ordered queues with time as the dimension, used to store all jobs that need to be delayed (only Job IDs are stored here).

Timer is responsible for scanning each Bucket in real time and placing Jobs with delay times greater than or equal to the current time into the corresponding Ready Queue.

ReadyQueue stores Jobs in the Ready state (only JobId is stored here) for consumption by consumer programs.

PHP implements lightweight delay queue (multi-threading) based on Redis

Message structure

Each Job must contain the following attributes:

topic: Job type. It can be understood as a specific business name.

id: The unique identifier of the Job. Used to retrieve and delete specified Job information.

delayTime: jod delayed execution time, 13-digit timestamp

ttr (time-to-run): Job execution timeout.

body: Job content, for consumers to do specific business processing, stored in json format.

For the same type of topic delaytime, ttr is generally fixed, and the job properties can be streamlined

1.topic: Job type. It can be understood as a specific business name

2.id: the unique identifier of the Job. Used to retrieve and delete specified Job information.

3.body: The content of the Job, for consumers to do specific business processing, stored in json format.

delaytime, ttr are configured in the topicadmin background

3. Target

Lightweight: It can run directly with less PHP expansion. There is no need to introduce network frameworks, such as swoole, workman and the like.

Stability: Using the master-work architecture, the master does not do business processing, but is only responsible for managing the child process. When the child process exits abnormally, it will be automatically started.

Availability:

1. Supports multi-instance deployment, each instance is stateless, and the failure of one instance will not affect the service

2. Supports the configuration of multiple redis, if one redis fails Only affects some messages

3. The business side has easy access, and only needs to fill in the relevant message types and callback interfaces in the background

Extensibility: When there is a bottleneck in the consumption process, you can configure it to increase consumption The number of processes. When there is a bottleneck in writing, the number of instances can be increased. The writing performance can be linearly improved.

Real-time performance: A certain time error is allowed.

Support message deletion: Business users can delete specified messages at any time.

Message transmission reliability: After the message enters the delay queue, it is guaranteed to be consumed at least once.

Write performance: qps>1000

4. Architecture design and description

Overall architecture

PHP implements lightweight delay queue (multi-threading) based on Redis

采用master-work架构模式,主要包括6个模块:

1.dq-mster: 主进程,负责管理子进程的创建,销毁,回收以及信号通知

2.dq-server: 负责消息写入,读取,删除功能以及维护redis连接池

3.dq-timer-N: 负责从redis的zset结构中扫描到期的消息,并负责写入ready 队列,个数可配置,一般2个就行了,因为消息在zset结构是按时间有序的

4.dq-consume-N: 负责从ready队列中读取消息并通知给对应回调接口,个数可配置

5.dq-redis-checker: 负责检查redis的服务状态,如果redis宕机,发送告警邮件

6.dq-http-server: 提供web后台界面,用于注册topic

五、模块流程图

消息写入:

PHP implements lightweight delay queue (multi-threading) based on Redis

timer查找到期消息:

PHP implements lightweight delay queue (multi-threading) based on Redis

consumer消费流程:

PHP implements lightweight delay queue (multi-threading) based on Redis

六、部署

环境依赖:PHP 5.4+ 安装sockets,redis,pcntl,pdo_mysql 拓展

ps: 熟悉docker的同学可以直接用镜像: shareclz/php7.2.14 里面包含了所需拓展

step1:安装数据库用于存储一些topic以及告警信息

执行:

mysql> source dq.sql

step2:在DqConfg.文件中配置数据库信息: DqConf::$db

step3: 启动http服务

在DqConf.php文件中修改php了路径

命令:

php DqHttpServer.php --port 8088

访问:http://127.0.0.1:8088,出现配置界面

PHP implements lightweight delay queue (multi-threading) based on Redis

redis信息格式:host:port:auth 比如 127.0.0.1:6379:12345

stop4:配置告信息(比如redis宕机)

PHP implements lightweight delay queue (multi-threading) based on Redis

stop5:注册topic

PHP implements lightweight delay queue (multi-threading) based on Redis

重试标记说明:

1.接口返回为空默认重试
2.满足指定返回表达会重试,res表示返回的json数组,比如:
回调接口返回json串:{"code":200,"data":{"status":2,"msg":"返回失败"}},重试条件可以这样写
    {res.code}!=200 
    {res.code}!=200 && {res.data.status}!=2 
    {res.code}==200 && {res.data.status}==2 || {res.data.msg}=='返回失败'

PHP implements lightweight delay queue (multi-threading) based on Redis

step6:启动服务进程:

php DqInit.php --port 6789 &

执行 ps -ef | grep dq 看到如下信息说明启动成功

PHP implements lightweight delay queue (multi-threading) based on Redis

step7: 写入数据,参考demo.php

step8:查看日志

默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath

1.请求日志:request_ymd.txt

2.通知日志:notify_ymd.txt

3.错误日志:err_ymd.txt

step9:如果配置文件有改动

1.系统会自动检测配置文件新,如果有改动,会自动退出(没有找到较好的热更新的方案),需要重启,可以在crontab里面建个任务,1分钟执行一次,程序有check_self的判断

2.优雅退出命令: master检测侦听了USR2信号,收到信号后会通知所有子进程,子进程完成当前任务后会自动退出

ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2

七、性能测试

需要安装pthreads拓展:

测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数

八、值得一提的性能优化点:

1.redis multi命令:将多个对redis的操作打包成一个减少网络开销

2.计数的操作异步处理,在异步逻辑里面用函数的static变量来保存,当写入redis成功后释放static变量,可以在redis出现异常时计数仍能保持一致,除非进程退出

3.内存泄露检测有必要: 所有的内存分配在底层都是调用了brk或者mmap,只要程序只有大量brk或者mmap的系统调用,内存泄露可能性非常高 ,检测命令: strace -c -p pid | grep -P 'mmap| brk'

4.检测程序的系统调用情况:strace -c -p pid ,发现某个系统函数调用是其他的数倍,可能大概率程序存在问题

九、异常处理

1. If the notification interface is called within the timeout period and no reply is received, the notification is considered failed. The system will put the data into the queue again and notify again. The system defaults to a maximum notification of 10 times (can be modified in the Dqconf.php file $ notify_exp_nums) The notification interval is 2n 1. For example, if the notification fails for 1 minute for the first time, until a reply is received after 3 minutes for the second time, the system will automatically discard it after exceeding the maximum number of notifications, and send an email notification at the same time

2 .Online redis is persisted every 1s. There may be cases where 1s data is lost. In this case, you can compare the request_ymd.txt and notify_ymd.txt logs to manually restore it.

3.redis downtime notification:

PHP implements lightweight delay queue (multi-threading) based on Redis

ps: Network jitter is inevitable. If the notification interface involves core services, it must be idempotent! !

10. Online situation

Two instances were deployed online, one in each computer room, 4 redis with a total of 16G memory for storage, and the service has been running stably for several months. , all indicators are in line with expectations.

Main access business:

##·Order 10-minute recall notice

·Compensation when calling the interface times out or fails

·Recall notification 3 days before membership expiration

11. Shortcomings and prospects

1. Since the image used by the team lacks libevent extension, dq-server is based on the select model, and there is a performance bottleneck in high-concurrency scenarios. It can be changed to be based on the libevent event model in the future to improve Concurrency performance.

2. Timer and consumer are currently implemented using multiple processes. This granularity feels a bit rough. You can consider using multi-threading mode and support dynamically creating the number of threads to improve consumer performance and ensure timely consumption to the greatest extent. .

3.dq-server and redis are called synchronously, which is also the bottleneck of performance. We plan to process it asynchronously based on swoole_redis.

PHP Chinese website has a large number of free

PHP video tutorials, everyone is welcome to learn!

This article is reproduced from: https://www.jianshu.com/p/58f10ac42162

Statement:
This article is reproduced at:jianshu.com. If there is any infringement, please contact admin@php.cn delete