Talk about multi-process consumption queue in PHP

青灯夜游
Release: 2023-04-09 11:46:01
forward
2935 people have browsed it

Talk about multi-process consumption queue in PHP

Introduction

Recently developed a small function using queue mcq to start a process consumption queue Data, later it was discovered that a process could not be processed, and another process was added. After a while, it could not be processed again...

This method requires modifying the crontab every time. If the process If it hangs, it will not start in time. It will not start until the next time crontab is executed. Kill is used when closing (restarting) the process, which may lose the data being processed. For example, in the following example, we assume that the sleep process is the processing logic. In order to clearly see the effect, the processing time is enlarged to 10s:

<?php
$i = 1;
while (1) {
    echo "开始第[{$i}]次循环\n";
    sleep(10);
    echo "结束第[{$i}]次循环\n";
    $i++;
}
Copy after login

After we run the script, wait until the loop starts, and then send kill {$pid} to the process. By default, the SIGTERM signal numbered 15 is sent. Suppose $i is obtained from the queue. When it gets 2, it is being processed. We send a kill signal to the program. Like the queue data loss, the problem is relatively big, so I have to find a way to solve these problems. question.

开始第[1]次循环
结束第[1]次循环
开始第[2]次循环


[1]    28372 terminated  php t.php
Copy after login

nginx process model

At this time I thought of nginx. As the mainstay of high-performance servers, nginx provides services to thousands of enterprises. For personal services, his process model is more classic, as shown below:

nginx 进程模型 图片来自网络

The administrator interacts with nginx through the master process, from /path/to/nginx .pidRead the pid of the nginx master process, send signals to the master process, the master performs different processing according to different signals, and then feeds back the information to the administrator. Workers are forked from the master process. The master is responsible for managing the workers and will not handle the business. The worker is the handler of the specific business. The master can control the exit and startup of the worker. When the worker exits unexpectedly, the master will receive a message that the child process has exited. message, new worker processes will be restarted to supplement them, so that business processing will not be affected. nginx can also exit smoothly without losing any data being processed. When updating the configuration, nginx can load the new configuration without affecting the online service, which is particularly useful when the request volume is large.

Process Design

After looking at nginx’s advanced model, we can develop a similar class library to meet the needs of processing mcq data. A single file can control all processes, exit smoothly, and view the status of child processes. It doesn’t need to be too complicated, because we deal with a certain delay in receiving queue data. It is troublesome, time-consuming and labor-intensive to provide uninterrupted service like nginx, and it is not very meaningful. The designed process model is similar to nginx, more like a simplified version of nginx.
Talk about multi-process consumption queue in PHP

Process semaphore design

Semaphore is a way of communication between processes. It is relatively simple and has a weak single function. It can only Send a signal to the process, and the process performs different processing according to the signal.

When the master process starts, save the pid to the file /path/to/daeminze.pid. The administrator communicates with the master process through signals. The master process installs three kinds of signals and encounters different Signals are processed differently, as shown below:

SIGINT 	=> 平滑退出,处理完正在处理的数据再退出
SIGTERM => 暴力退出,无论进程是否正在处理数据直接退出
SIGUSR1 => 查看进程状态,查看进程占用内存,运行时间等信息
Copy after login

The master process communicates with the worker process through signals. The worker process has installed 2 signals, as shown below:

SIGINT 	=> 平滑退出
SIGUSR1	=> 查看worker进程自身状态
Copy after login

Why is the worker process Only 2 signals are installed, one is missing SIGTERM, because after the master process receives the signal SIGTERM, it sends the SIGKILL signal to the worker process, and the process is forced to close by default. That’s it.

The worker process is forked out by the master process, so that the master process can wait for the child process exit event through pcntl_wait. When a child process exits, the child process pid is returned, processed and Start a new process to add it.

The master process also waits to receive signals through pcntl_wait. When a signal arrives, it will return -1. There are still some pitfalls in this place, which will be detailed below. speak.

There are two ways to trigger signals in PHP. The first way is declare(ticks = 1);. This is not efficient. Every time Zend executes a low-level statement, it will go Check whether there are unhandled signals in the process. It is rarely used now. PHP 5.3.0 and previous versions may use this.

The second method is to call unprocessed signals through pcntl_signal_dispatch. PHP 5.4.0 and later versions are applicable. This function can be cleverly placed in a loop. , there is basically no loss in performance, and it is recommended for use now.

PHP installs and repairs the signal signal

PHP installs the signal through pcntl_signal. The function declaration is as follows:

bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )
Copy after login

第三个参数restart_syscalls不太好理解,找了很多资料,也没太查明白,经过试验发现,这个参数对pcntl_wait函数接收信号有影响,当设置为缺省值true的时候,发送信号,进程用pcntl_wait收不到,必须设置为false才可以,看看下面这个例子:

<?php
$i = 0;
while ($i<5) {
    $pid = pcntl_fork();
    $random = rand(10, 50);
    if ($pid == 0) {
        sleep($random);
        exit();
    }
    echo "child {$pid} sleep {$random}\n";
    $i++;
}

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
});

while (1) {
    $pid = pcntl_wait($status);
    var_dump($pid);
    pcntl_signal_dispatch();
}
Copy after login

运行之后,我们对父进程发送kill -SIGINT {$pid}信号,发现pcntl_wait没有反应,等到有子进程退出的时候,发送过的SIGINT会一个个执行,比如下面结果:

child 29643 sleep 48
child 29644 sleep 24
child 29645 sleep 37
child 29646 sleep 20
child 29647 sleep 31
int(29643)
Ctrl + C
Ctrl + C
Ctrl + C
Ctrl + C
int(29646)
Copy after login

这是运行脚本之后马上给父进程发送了四次SIGINT信号,等到一个子进程推出的时候,所有信号都会触发。

但当把安装信号的第三个参数设置为false

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);
Copy after login

这时候给父进程发送SIGINT信号,pcntl_wait会马上返回-1,信号对应的事件也会触发。

所以第三个参数大概意思就是,是否重新注册此信号,如果为false只注册一次,触发之后就返回,pcntl_wait就能收到消息,如果为true,会重复注册,不会返回,pcntl_wait收不到消息。

信号量和系统调用

信号量会打断系统调用,让系统调用立刻返回,比如sleep,当进程正在sleep的时候,收到信号,sleep会马上返回剩余sleep秒数,比如:

Copy after login

运行之后,按Ctrl + C,结果如下所示:

123
^Climit sleep [1] s
Ctrl + C
123
limit sleep [0] s
123
^Climit sleep [1] s
Ctrl + C
123
^Climit sleep [2] s
Copy after login

daemon(守护)进程

这种进程一般设计为daemon进程,不受终端控制,不与终端交互,长时间运行在后台,而对于一个进程,我们可以通过下面几个步骤把他升级为一个标准的daemon进程:

protected function daemonize()
{
    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } elseif ($pid != 0) {
        exit(0);
    }
    if (-1 == posix_setsid()) {
        throw new Exception("新建立session会话失败");
    }

    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } else if($pid != 0) {
        exit(0);
    }

    umask(0);
    chdir("/");
}
Copy after login

拢共分五步:

1、fork子进程,父进程退出。

2、设置子进程为会话组长,进程组长。

3、再次fork,父进程退出,子进程继续运行。

4、恢复文件掩码为0

5、切换当前目录到根目录/

第2步是为第1步做准备,设置进程为会话组长,必要条件是进程非进程组长,因此做第一次fork,进程组长(父进程)退出,子进程通过posix_setsid()设置为会话组长,同时也为进程组长。

第3步是为了不让进程重新控制终端,因为一个进程控制一个终端的必要条件是会话组长(pid=sid)。

第4步是为了恢复默认的文件掩码,避免之前做的操作对文件掩码做了设置,带来不必要的麻烦。关于文件掩码, linux中,文件掩码在创建文件、文件夹的时候会用到,文件的默认权限为666,文件夹为777,创建文件(夹)的时候会用默认值减去掩码的值作为创建文件(夹)的最终值,比如掩码022下创建文件666 - 222 = 644,创建文件夹777 - 022 = 755

掩码新建文件权限新建文件夹权限

umask(0)

666 (-rw-rw-rw-)

777 (drwxrwxrwx)

umask(022)

644 (-rw-r--r--)

755 (drwxr-xr-x)

第5步是切换了当前目录到根目录/,网上说避免起始运行他的目录不能被正确卸载,这个不是太了解。

对应5步,每一步的各种id变化信息:

操作后pidppidpgidsid

开始

17723

31381

17723

31381

第一次fork

17723

1

17723

31381

posix_setsid()

17740

1

17740

17740

第二次fork

17840

1

17740

17740

另外,会话、进程组、进程的关系如下图所示,这张图有助于更好的理解。
Talk about multi-process consumption queue in PHP

至此,你也可以轻松地造出一个daemon进程了。

命令设计

我准备给这个类库设计6个命令,如下所示:

  1. start 启动命令

  2. restart 强制重启

  3. stop 平滑停止

  4. reload 平滑重启

  5. quit 强制停止

  6. status 查看进程状态

启动命令

启动命令就是默认的流程,按照默认流程走就是启动命令,启动命令会检测pid文件中是否已经有pid,pid对应的进程是否健康,是否需要重新启动。

强制停止命令

管理员通过入口文件结合pid给master进程发送SIGTERM信号,master进程给所有子进程发送SIGKILL信号,等待所有worker进程退出后,master进程也退出。

强制重启命令

强制停止命令 + 启动命令

平滑停止命令

平滑停止命令,管理员给master进程发送SIGINT信号,master进程给所有子进程发送SIGINT,worker进程将自身状态标记为stoping,当worker进程下次循环的时候会根据stoping决定停止,不在接收新的数据,等所有worker进程退出之后,master进程也退出。

平滑重启命令

平滑停止命令 + 启动命令

查看进程状态

查看进程状态这个借鉴了workerman的思路,管理员给master进程发送SIGUSR1信号,告诉主进程,我要看所有进程的信息,master进程,master进程将自身的进程信息写入配置好的文件路径A中,然后发送SIGUSR1,告诉worker进程把自己的信息也写入文件A中,由于这个过程是异步的,不知道worker进程啥时候写完,所以master进程在此处等待,等所有worker进程都写入文件之后,格式化所有的信息输出,最后输出的内容如下所示:

/dir /usr/local/bin/php DaemonMcn.php status
Daemon [DaemonMcn] 信息:
-------------------------------- master进程状态 --------------------------------
pid       占用内存       处理次数       开始时间                 运行时间
16343     0.75M          --             2018-05-15 09:42:45      0 天 0 时 3 分
12 slaver
-------------------------------- slaver进程状态 --------------------------------
任务task-mcq:
16345     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分
16346     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分
--------------------------------------------------------------------------------
任务test-mcq:
16348     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分
16350     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分
16358     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分
16449     0.75M          1              2018-05-15 09:46:40      0 天 0 时 0 分
--------------------------------------------------------------------------------
Copy after login

等待worker进程将进程信息写入文件的时候,这个地方用了个比较trick的方法,每个worker进程输出一行信息,统计文件的行数,达到worker进程的行数之后表示所有worker进程都将信息写入完毕,否则,每个1s检测一次。

其他设计

另外还加了两个比较实用的功能,一个是worker进程运行时间限制,一个是worker进程循环处理次数限制,防止长时间循环进程出现内存溢出等意外情况。时间默认是1小时,运行次数默认是10w次。

除此之外,也可以支持多任务,每个任务几个进程独立开,统一由master进程管理。

代码已经放到github中,有兴趣的可以试试,不支持windows哦,有什么错误还望指出来。

相关教程推荐:《PHP教程

The above is the detailed content of Talk about multi-process consumption queue in PHP. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
php
source:cnblogs.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!