這篇文章帶大家來了解PHP中的多進程消費隊列。有一定的參考價值,有需要的朋友可以參考一下,希望對大家有幫助。
推薦學習:《PHP影片教學》
最近開發一個小功能,用到了佇列mcq,啟動一個行程消費佇列數據,後邊發現一個進程處理不過來了,又加了一個進程,過了一段時間又處理不過來了......
這種方式每次都要修改crontab,如果進程掛掉了,不會及時的啟動,要等到下次crontab執行的時候才會啟動。關閉(重啟)進程的時候用的是kill,這可能會丟失正在處理的數據,比如下面這個例子,我們假設sleep過程就是處理邏輯,這裡為了明顯看出效果,將處理時間放大到10s:
<?php $i = 1; while (1) { echo "开始第[{$i}]次循环\n"; sleep(10); echo "结束第[{$i}]次循环\n"; $i++; }
當我們執行腳本之後,等到迴圈開始之後,給行程發送kill {$pid}
,預設發送的是編號為15的SIGTERM
訊號。假設$i
是從佇列拿到的,拿到2的時候,正在處理,我們給程式發送了kill訊號,和佇列資料遺失一樣,問題比較大,因此我要想辦法解決這些問題。
开始第[1]次循环 结束第[1]次循环 开始第[2]次循环 [1] 28372 terminated php t.php
這時候我想到了nginx,nginx作為高效能伺服器的中流砥柱,為成千上萬的企業和個人服務,他的進程模型比較經典,如下所示:
管理員透過master進程和nginx進行交互,從/path/to/nginx.pid
讀取nginx master進程的pid ,發送訊號給master進程,master根據不同的訊號做出不同的處理,然後回饋訊息給管理員。 worker是master進程fork出來的,master負責管理worker,不會去處理業務,worker才是具體業務的處理者,master可以控制worker的退出、啟動,當worker意外退出,master會收到子進程退出的訊息,也會重新啟動新的worker進程補充上來,不讓業務處理受影響。 nginx還可以平滑退出,不丟失任何一個正在處理的數據,更新配置時nginx可以做到不影響線上服務來加載新的配置,這在請求量很大的時候特別有用。
看了nginx的進模型,我們完全可以開發一個類似的類別庫來滿足處理mcq資料的需求,做到單一檔案控制所有進程、可以平滑退出、可以查看子進程狀態。不需要太複雜,因為我們處理佇列資料接收一定的延遲,做到nginx那樣不間斷服務比較麻煩,費時費力,意義不是很大。設計的流程模型跟nginx類似,比較像是nginx的簡化版本。
信號量是進程間通訊的一種方式,比較簡單,單一功能也比較弱,只能發送訊號給進程,進程根據訊號做出不同的處理。
master進程啟動的時候保存pid到檔案/path/to/daeminze.pid
,管理員透過訊號和master進程通訊,master進程安裝3種訊號,碰到不同的訊號,做出不同的處理,如下所示:
SIGINT => 平滑退出,处理完正在处理的数据再退出 SIGTERM => 暴力退出,无论进程是否正在处理数据直接退出 SIGUSR1 => 查看进程状态,查看进程占用内存,运行时间等信息
master進程透過訊號和worker進程通訊,worker進程安裝了2個訊號,如下所示:
SIGINT => 平滑退出 SIGUSR1 => 查看worker进程自身状态
為什麼worker進程只安裝2個訊號呢,少了個SIGTERM
,因為master進程收到訊號SIGTERM
之後,向worker進程發送SIGKILL
訊號,預設強制關閉進程即可。
worker進程是透過master進程fork出來的,這樣master進程可以透過pcntl_wait
來等待子進程退出事件,當有子進程退出的時候返回子進程pid,做處理並啟動新的進程補充上來。
master進程也透過pcntl_wait
來等待接收訊號,當有訊號到達的時候,會返回-1
,這個地方還有一些坑,在下文中會詳細講。
PHP中有2種訊號觸發的方式,第一種方式是declare(ticks = 1);
,這種效率不高,Zend每執行一次低階語句,都會去檢查進程中是否有未處理的訊號,現在已經很少使用了,PHP 5.3.0
及之前的版本可能會用到這個。
第二種是透過pcntl_signal_dispatch
來呼叫未處理的訊號,PHP 5.4.0
及之後的版本適用,可以巧妙的將該函數放在循環中,性能上基本沒什麼損失,現在推薦適用。
PHP透過pcntl_signal
安裝訊號,函數宣告如下所示:
bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )
第三个参数restart_syscalls
不太好理解,找了很多资料,也没太查明白,经过试验发现,这个参数对pcntl_wait
函数接收信号有影响,当设置为缺省值true
的时候,发送信号,进程用pcntl_wait
收不到,必须设置为false
才可以,看看下面这个例子:
<?php $i = 0; while ($i<5) { $pid = pcntl_fork(); $random = rand(10, 50); if ($pid == 0) { sleep($random); exit(); } echo "child {$pid} sleep {$random}\n"; $i++; } pcntl_signal(SIGINT, function($signo) { echo "Ctrl + C\n"; }); while (1) { $pid = pcntl_wait($status); var_dump($pid); pcntl_signal_dispatch(); }
运行之后,我们对父进程发送kill -SIGINT {$pid}
信号,发现pcntl_wait没有反应,等到有子进程退出的时候,发送过的SIGINT
会一个个执行,比如下面结果:
child 29643 sleep 48 child 29644 sleep 24 child 29645 sleep 37 child 29646 sleep 20 child 29647 sleep 31 int(29643) Ctrl + C Ctrl + C Ctrl + C Ctrl + C int(29646)
这是运行脚本之后马上给父进程发送了四次SIGINT
信号,等到一个子进程推出的时候,所有信号都会触发。
但当把安装信号的第三个参数设置为false
:
pcntl_signal(SIGINT, function($signo) { echo "Ctrl + C\n"; }, false);
这时候给父进程发送SIGINT
信号,pcntl_wait
会马上返回-1
,信号对应的事件也会触发。
所以第三个参数大概意思就是,是否重新注册此信号,如果为false只注册一次,触发之后就返回,pcntl_wait
就能收到消息,如果为true,会重复注册,不会返回,pcntl_wait
收不到消息。
信号量会打断系统调用,让系统调用立刻返回,比如sleep
,当进程正在sleep的时候,收到信号,sleep会马上返回剩余sleep秒数,比如:
<?php pcntl_signal(SIGINT, function($signo) { echo "Ctrl + C\n"; }, false); while (true) { pcntl_signal_dispatch(); echo "123\n"; $limit = sleep(2); echo "limit sleep [{$limit}] s\n"; }
运行之后,按Ctrl + C
,结果如下所示:
123 ^Climit sleep [1] s Ctrl + C 123 limit sleep [0] s 123 ^Climit sleep [1] s Ctrl + C 123 ^Climit sleep [2] s
这种进程一般设计为daemon进程,不受终端控制,不与终端交互,长时间运行在后台,而对于一个进程,我们可以通过下面几个步骤把他升级为一个标准的daemon进程:
protected function daemonize() { $pid = pcntl_fork(); if (-1 == $pid) { throw new Exception("fork进程失败"); } elseif ($pid != 0) { exit(0); } if (-1 == posix_setsid()) { throw new Exception("新建立session会话失败"); } $pid = pcntl_fork(); if (-1 == $pid) { throw new Exception("fork进程失败"); } else if($pid != 0) { exit(0); } umask(0); chdir("/"); }
拢共分五步:
0
。/
。第2步是为第1步做准备,设置进程为会话组长,必要条件是进程非进程组长,因此做第一次fork,进程组长(父进程)退出,子进程通过posix_setsid()
设置为会话组长,同时也为进程组长。
第3步是为了不让进程重新控制终端,因为一个进程控制一个终端的必要条件是会话组长(pid=sid)。
第4步是为了恢复默认的文件掩码,避免之前做的操作对文件掩码做了设置,带来不必要的麻烦。关于文件掩码, linux中,文件掩码在创建文件、文件夹的时候会用到,文件的默认权限为666,文件夹为777,创建文件(夹)的时候会用默认值减去掩码的值作为创建文件(夹)的最终值,比如掩码022
下创建文件666 - 222 = 644
,创建文件夹777 - 022 = 755
:
掩码 | 新建文件权限 | 新建文件夹权限 |
---|---|---|
umask(0) | 666 (-rw-rw-rw-) | 777 (drwxrwxrwx) |
umask(022) | 644 (-rw-r--r--) | 755 (drwxr-xr-x) |
第5步是切换了当前目录到根目录/
,网上说避免起始运行他的目录不能被正确卸载,这个不是太了解。
对应5步,每一步的各种id变化信息:
操作后 | pid | ppid | pgid | sid |
---|---|---|---|---|
开始 | 17723 | 31381 | 17723 | 31381 |
第一次fork | 17723 | 1 | 17723 | 31381 |
posix_setsid() | 17740 | 1 | 17740 | 17740 |
第二次fork | 17840 | 1 | 17740 | 17740 |
另外,会话、进程组、进程的关系如下图所示,这张图有助于更好的理解。
至此,你也可以轻松地造出一个daemon进程了。
我准备给这个类库设计6个命令,如下所示:
启动命令就是默认的流程,按照默认流程走就是启动命令,启动命令会检测pid文件中是否已经有pid,pid对应的进程是否健康,是否需要重新启动。
管理员通过入口文件结合pid给master进程发送SIGTERM
信号,master进程给所有子进程发送SIGKILL
信号,等待所有worker进程退出后,master进程也退出。
强制停止命令
+ 启动命令
平滑停止命令,管理员给master进程发送SIGINT
信号,master进程给所有子进程发送SIGINT
,worker进程将自身状态标记为stoping
,当worker进程下次循环的时候会根据stoping
决定停止,不在接收新的数据,等所有worker进程退出之后,master进程也退出。
平滑停止命令
+ 启动命令
查看进程状态这个借鉴了workerman的思路,管理员给master进程发送SIGUSR1
信号,告诉主进程,我要看所有进程的信息,master进程,master进程将自身的进程信息写入配置好的文件路径A中,然后发送SIGUSR1
,告诉worker进程把自己的信息也写入文件A中,由于这个过程是异步的,不知道worker进程啥时候写完,所以master进程在此处等待,等所有worker进程都写入文件之后,格式化所有的信息输出,最后输出的内容如下所示:
➜/dir /usr/local/bin/php DaemonMcn.php status Daemon [DaemonMcn] 信息: -------------------------------- master进程状态 -------------------------------- pid 占用内存 处理次数 开始时间 运行时间 16343 0.75M -- 2018-05-15 09:42:45 0 天 0 时 3 分 12 slaver -------------------------------- slaver进程状态 -------------------------------- 任务task-mcq: 16345 0.75M 236 2018-05-15 09:42:45 0 天 0 时 3 分 16346 0.75M 236 2018-05-15 09:42:45 0 天 0 时 3 分 -------------------------------------------------------------------------------- 任务test-mcq: 16348 0.75M 49 2018-05-15 09:42:45 0 天 0 时 3 分 16350 0.75M 49 2018-05-15 09:42:45 0 天 0 时 3 分 16358 0.75M 49 2018-05-15 09:42:45 0 天 0 时 3 分 16449 0.75M 1 2018-05-15 09:46:40 0 天 0 时 0 分 --------------------------------------------------------------------------------
等待worker进程将进程信息写入文件的时候,这个地方用了个比较trick的方法,每个worker进程输出一行信息,统计文件的行数,达到worker进程的行数之后表示所有worker进程都将信息写入完毕,否则,每个1s检测一次。
另外还加了两个比较实用的功能,一个是worker进程运行时间限制,一个是worker进程循环处理次数限制,防止长时间循环进程出现内存溢出等意外情况。时间默认是1小时,运行次数默认是10w次。
除此之外,也可以支持多任务,每个任务几个进程独立开,统一由master进程管理。
代码已经放到github中,有兴趣的可以试试,不支持windows哦,有什么错误还望指出来。
更多编程相关知识,请访问:编程入门!!
以上是淺談PHP中的多進程消費隊列的詳細內容。更多資訊請關注PHP中文網其他相關文章!