This article introduces the method of using task process in swoole to handle time-consuming tasks. It has certain reference value. I hope it will be helpful to students who are learning the swoole framework!
#How to use task process in swoole to handle time-consuming tasks?
We know that there are two major processes in swoole, namely master main process and manager management process.
There will be a main reactor thread and multiple reactor threads in the master main process. The main function is to maintain TCP connections, process network IO, and send and receive data.
The manager manages processes and its role is to fork and manage worker and task processes.
The function of the worker process is to receive the data passed by the reactor thread, process the data, and return the processing results to the reactor thread.
The role of the task process is to handle some relatively time-consuming tasks. The task process is independent of the worker process and will not affect the worker process' processing of client requests.
1. Application scenarios of task process:
1. Relatively time-consuming mass mailing, such as a certain event, which requires sending event emails to 1 million users.
2. Push the updates of certain big Vs. For example, if a big V posts a new message, fans need to get the updates in time.
Recommended learning: swoole tutorial
2. The relationship between worker and task:
1. What can be done in the worker process The task is delivered by calling task(), and the task process responds to the delivered task through the onTask event.
2. In the task process, you can tell the worker process that the task is completed by returning directly or calling finish(). In the worker process, you can respond to the task completion through the onFinish event.
3. Prerequisites for using task:
1. Configure the number of task_worker_num in Server.
2. Set the Server's onTask and onFinish event callback functions.
4. A simple example of using task to calculate the cumulative sum
<?php $server = new swoole_server('0.0.0.0', 6666); $server->set([ 'worker_num' => 2, 'task_worker_num' => 16, ]); $server->on('WorkerStart', function ($server, $worker_id) { //注意这里,我们通过taskworker来判断是task进程还是worker进程 //需要在worker进程中调用task(),不然会报出警告 //这里会执行两遍,因为我们设置了worker_num数为2 if (!$server->taskworker) { echo '投递任务开始...', PHP_EOL; //投递32个累加计算任务给16个task进程 for ($ix = 0; $ix < 32; $ix++) { //注意这里的投递是异步的 $server->task([mt_rand(1, 100), mt_rand(1000, 9999)]); } echo '投递任务结束...', PHP_EOL; } }); //server服务必须要有onReceive回调 $server->on('Receive', function ($server, $fd, $reactor_id, $data) { }); //注意,task进程完全是同步阻塞模式的 $server->on('Task', function ($server, $task_id, $src_worker_id, $data) { echo "task {$task_id} 进程正在工作...", PHP_EOL; $start = $data[0]; $end = $data[1]; $total = 0; for (; $start <= $end; $start++) { $total += $start; } echo "task {$task_id} 进程完成工作...", PHP_EOL; return $total; }); $server->on('Finish', function ($server, $task_id, $data) { echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL; }); $server->start();
Note that we deliver tasks to the task pool by calling task(), and the swoole bottom layer will take turns Query delivery tasks to each task process.
When the number of tasks you deliver exceeds the processing speed of onTask, this will cause the task pool to be filled up, which will cause the worker process to be blocked, so the relationship between the number of task_worker_num and the processing speed needs to be set appropriately.
Of course, we can also manually deliver the task to the specified task process. The second parameter of the task() function can specify the task process ID to be delivered, and the ID range is 0 to (task_worker_num - 1).
5. Segment the task and manually control delivery to the task process
<?php $server = new swoole_server('0.0.0.0', 6666); $server->set([ 'worker_num' => 1, 'task_worker_num' => 10, ]); $server->on('WorkerStart', function ($server, $worker_id) { //为了方便演示,把worker_num设置为1,这里只会执行一次 if (!$server->taskworker) { //通过swoole_table共享内存,在不同进程中共享数据 $server->result = new swoole_table(10240); //用于保存task进程完成数量 $server->result->column('finish_nums', swoole_table::TYPE_INT); //用于保存最终计算结果 $server->result->column('result', swoole_table::TYPE_INT); $server->result->create(); //计算1000的累加和,并把计算任务分配到10个task进程上 $num = 1000; $step = $num / $server->setting['task_worker_num']; for ($ix = 0; $ix < $server->setting['task_worker_num']; $ix++) { $start = $ix * $step; $server->task([$start, $start + $step], $ix); } } }); $server->on('Receive', function ($server, $fd, $reactor_id, $data) { }); //注意,task进程完全是同步阻塞模式的 $server->on('Task', function ($server, $task_id, $src_worker_id, $data) { echo "task {$task_id} 进程正在工作... 计算 {$data[0]} - {$data[1]} ", PHP_EOL; $start = ++$data[0]; $end = $data[1]; $total = 0; for (; $start <= $end; $start++) { $total += $start; } echo "task {$task_id} 进程完成工作...", PHP_EOL; return $total; }); $server->on('Finish', function ($server, $task_id, $data) { echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL; $server->result->incr('finish_nums', 'finish_nums'); $server->result->set('result', ['result' => $data + $server->result->get('result', 'result')]); if ($server->result->get('finish_nums', 'finish_nums') == $server->setting['task_worker_num']) { echo "最终计算结果:{$server->result->get('result', 'result')}", PHP_EOL; } }); $server->s tart();
The above is the detailed content of How to use task process in swoole to handle time-consuming tasks?. For more information, please follow other related articles on the PHP Chinese website!