Da PHP als Gesamtsystem kein Multithreading unterstützt, müssen viele Vorgänge asynchron ausgeführt werden. Um diese asynchronen Vorgänge abzuschließen, haben wir ein Redis-Warteschlangenaufgabensystem erstellt.
Wie wir alle wissen, ist ein Nachrichtenwarteschlangenverarbeitungssystem hauptsächlich in zwei Teile unterteilt: Verbraucher und Produzenten.
In unserem System fungiert das Hauptsystem als Produzent und das Tasksystem als Konsument.
Der spezifische Arbeitsablauf ist wie folgt: 1. Das Hauptsystem schiebt den Aufgabennamen + Aufgabenparameter, die verarbeitet werden müssen, in die Warteschlange. 2. Das Aufgabensystem öffnet die Aufgabenwarteschlange in Echtzeit. Wenn eine Aufgabe herausspringt, verzweigt es einen Unterprozess und der Unterprozess vervollständigt die spezifische Aufgabenlogik.
/** * 启动守护进程 */ public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit; } /** * 创建子进程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } } } /** * 业务任务队列处理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE; }
+VX:PHPopen888PHP中高级教程分享,专注laravel,swoole,tp,分布式高并发处理教程分享