Home  >  Article  >  Backend Development  >  How is PHP delay queue implemented?

How is PHP delay queue implemented?

爱喝马黛茶的安东尼
爱喝马黛茶的安东尼Original
2019-09-26 09:21:162872browse

How is PHP delay queue implemented?

Delay queue, as the name suggests, is a message queue with delay function. So, under what circumstances do I need such a queue?

1. Background

Let’s take a look at the business scenario:

1. Send a recall notice 3 days before the membership expires

2. After the order payment is successful, check whether the downstream links are normal after 5 minutes. For example, after the user purchases a membership, whether the various membership statuses are set successfully

3. How to regularly check whether the order in refund status has been Refund successfully?

4. If the notification fails, the notification will be repeated in 1, 3, 5, and 7 minutes until the other party replies?

Usually the simplest and most direct way to solve the above problems is to scan the meter regularly.

The problems with table scanning are:

1. The table scanning is connected to the database for a long time. In the case of large quantities, the connection is prone to abnormal interruption, which requires more exception handling and the program High robustness requirements

2. When the amount of data is large, the delay is high and the processing cannot be completed within the regulations, which affects the business. Although multiple processes can be started for processing, this will bring additional maintenance costs. , cannot be fundamentally solved.

3. Each business must maintain its own table scanning logic. When the business increases, it is found that the logic of the table scanning part will be developed repeatedly, but it is very similar

The delay queue can solve the above needs very well

2. Research

We investigated some open source solutions on the market, as follows:

·Youzan Technology: Only principles, no open source code

·Github personal: https://github.com/ouqiang/delay-queue

1. Based on redis implementation, only one redis can be configured. If redis hangs, the entire service will be unavailable. The usability is poor

2. The consumer side implements the pull model, and the access cost is high. Each project has to implement the access code

3. There are not many people using star. There are risks when placed in a production environment. In addition, if you don’t understand the Go language, it will be difficult to maintain if something goes wrong

·SchedulerX-Alibaba open source: Very powerful, but complex operation and maintenance, dependent on components Too many, not lightweight enough

·RabbitMQ-delayed task: It does not have a delay function itself, it needs to be implemented by itself with the help of a feature, and the company has not deployed this queue, so deploy this one separately. The cost of making a delayed queue is a bit high, and it also requires special operation and maintenance. Currently, the team does not support it. Basically, for the above reasons, I plan to write one myself. I usually use PHP mostly, and the project basically uses the zset structure of redis as storage. Implemented in PHP language, the implementation principle refers to the Youzan team: https://tech.youzan.com/queuing_delay/

Related recommendations: "

php tutorial

"

3. Goal

·

Lightweight: It can be run directly with less PHP extensions, without the need to introduce network frameworks, such as swoole, workman, etc.

·

Stability: Using the master-work architecture, the master does not do business processing, but is only responsible for managing the child process. When the child process exits abnormally, it will automatically start up

·

Availability: 1. Supports multi-instance deployment, each instance is stateless, and the failure of one instance will not affect the service

2. Supports the configuration of multiple redis, one redis Hanging up will only affect some messages

3. The business side has easy access. In the background, you only need to fill in the relevant message type and return the interface

·

Extensibility: when consuming When there is a bottleneck in the process, you can configure to increase the number of consuming processes. When there is a bottleneck in writing, you can increase the number of instances. The writing performance can be improved linearly

·

Real-time: a certain amount of time is allowed time error.

·

Support message deletion: Business users can delete specified messages at any time.

·

Message transmission reliability: After the message enters the delay queue, it is guaranteed to be consumed at least once.

·

Write performance: qps>1000

4. Architecture design and description

Overall architecture

Insert picture description here

Adopts the master-work architecture model, which mainly includes 6 modules:

1.dq-mster: main process, responsible for managing the creation and destruction of child processes. Recycling and signal notification

2.dq-server: Responsible for message writing, reading, deletion functions and maintaining the redis connection pool

3.dq-timer-N: Responsible for zset from redis Scans the expired messages in the structure and is responsible for writing to the ready queue. The number is configurable, usually 2 is enough, because the messages in the zset structure are ordered by time

4.dq-consume-N : Responsible for reading messages from the ready queue and notifying the corresponding callback interface. The number can be configured

5.dq-redis-checker: Responsible for checking the service status of redis. If redis is down, send an alarm email

6.dq-http-server: Provides a web background interface for registering topics

5. Deployment

Environment dependencies: PHP 5.4 installation sockets, redis, pcntl, pdo_mysql extension

step1:安装数据库用于存储一些topic以及告警信息

create database dq;
#存放告警信息
CREATE TABLE `dq_alert` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `host` varchar(255) NOT NULL DEFAULT '',
  `port` int(11) NOT NULL DEFAULT '0',
  `user` varchar(255) NOT NULL DEFAULT '',
  `pwd` varchar(255) NOT NULL DEFAULT '',
  `ext` varchar(2048) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#存放redis信息
CREATE TABLE `dq_redis` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `t_name` varchar(200) NOT NULL DEFAULT '',
  `t_content` varchar(2048) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
#存储注册信息
CREATE TABLE `dq_topic` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `t_name` varchar(1024) NOT NULL DEFAULT '',
  `delay` int(11) NOT NULL DEFAULT '0',
  `callback` varchar(1024) NOT NULL DEFAULT '',
  `timeout` int(11) NOT NULL DEFAULT '3000',
  `email` varchar(1024) NOT NULL DEFAULT '',
  `topic` varchar(255) NOT NULL DEFAULT '',
  `createor` varchar(1024) NOT NULL DEFAULT '',
  `status` tinyint(4) NOT NULL DEFAULT '1',
  `method` varchar(32) NOT NULL DEFAULT 'GET',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

step2:在DqConfg.文件中配置数据库信息: DqConf::$db

step3:启动http服务

在DqConf.php文件中修改php了路径 $logPath

命令:

php DqHttpServer.php --port 8088

访问:http://127.0.0.1:8088,出现配置界面

在这里插入图片描述

redis信息格式:host:post:auth 比如 127.0.0.1:6379:12345

stop4:启动服务进程

php DqInit.php --port 6789

看到如下信息说明启动成功

在这里插入图片描述

stop5:配置告警信息(比如redis宕机)

在这里插入图片描述

stop6:注册topic

在这里插入图片描述

在这里插入图片描述

step7:写入数据,在项目根目录下新建test.php文件写入

<?php
include_once &#39;DqLoader.php&#39;;
date_default_timezone_set("PRC");
//可配置多个
$server=array(
    &#39;127.0.0.1:6789&#39;,
);
$dqClient = new DqClient();
$dqClient->addServer($server);
 
$topic =&#39;order_openvip_checker&#39;; //topic在后台注册
$id = uniqid();
$data=array(
    &#39;id&#39;=>$id,
    &#39;body&#39;=>array(
        &#39;a&#39;=>1,
        &#39;b&#39;=>2,
        &#39;c&#39;=>3,
        &#39;ext&#39;=>str_repeat(&#39;a&#39;,64),
    ),
    //可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定
    &#39;fix_time&#39;=>date(&#39;Y-m-d 23:50:50&#39;),
);
 
//添加
$boolRet = $dqClient->add($topic, $data);
echo &#39;add耗时:&#39;.(msectime() - $time)."ms\n";
//查询
$time = msectime();
$result = $dqClient->get($topic, $id);
echo &#39;get耗时:&#39;.(msectime() - $time)."ms\n";
 
//删除
$time = msectime();
$boolRet = $dqClient->del($topic,$id);
echo &#39;del耗时:&#39;.(msectime() - $time)."ms\n";

执行php test.php

step8:查看日志

默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath

1.请求日志:request_ymd.txt

2.通知日志:notify_ymd.txt

3.错误日志:err_ymd.txt

step9:如果配置文件有改动

1.系统会自动检测配置文件新,如果有改动,会自动退出(没有找到较好的热更新的方案),需要重启,可以在crontab里面建个任务,1分钟执行一次,程序有check_self的判断

2.优雅退出命令: master检测侦听了USR2信号,收到信号后会通知所有子进程,子进程完成当前任务后会自动退出

ps -ef | grep dq-master| grep -v grep | head -n 1 | awk &#39;{print $2}&#39; | xargs kill -USR2

六、性能测试

需要安装pthreads拓展:

测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数

php DqBench  concurrency  requests
concurrency:并发数
requests: 每个并发产生的请求数
测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节
qps:2400

七、值得一提的性能优化点:

1.redis multi命令:将多个对redis的操作打包成一个减少网络开销。

2.计数的操作异步处理,在异步逻辑里面用函数的static变量来保存,当写入redis成功后释放static变量,可以在redis出现异常时计数仍能保持一致,除非进程退出。

3.内存泄露检测有必要: 所有的内存分配在底层都是调用了brk或者mmap,只要程序只有大量brk或者mmap的系统调用,内存泄露可能性非常高 ,检测命令: strace -c -p pid | grep 'mmap| brk'。

4.检测程序的系统调用情况:strace -c -p pid ,发现某个系统函数调用是其他的数倍,可能大概率程序存在问题。

八、异常处理

如果调用通知接口在超时时间内,没有收到回复认为通知失败,系统会重新把数据放入队列,重新通知,系统默认最大通知10次(可以在Dqconf.php文件中修改$notify_exp_nums)通知间隔为2n+1,比如第一次1分钟,通知失败,第二次3分钟后,直到收到回复,超出最大通知次数后系统自动丢弃,同时发邮件通知。

ps:网络抖动在所难免,通知接口如果涉及到核心的服务,一定要保证幂等!!

九、线上情况

线上部署了两个实例每个机房部一个,4个redis作存储,服务稳定运行数月,各项指标均符合预期。

主要接入业务:

    ·订单10分钟召回通知

    ·接口超时或者失败补偿

项目地址: https://github.com/chenlinzhong/php-delayqueue

The above is the detailed content of How is PHP delay queue implemented?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:How to create php fileNext article:How to create php file