Analysis of Swoole and Swoft source code of Swoft

Release: 2023-04-03 10:42:01
The content of this article is to share with you some introduction to Swoole and Swoft about Swoft source code analysis (Task delivery/scheduled tasks). It has certain reference value and friends in need can refer to it.


Swoft’s task function is based on Swoole’s Task mechanism, or Swoft The essence of the Task mechanism is the encapsulation and enhancement of the Task mechanism of Swoole.

Task delivery

class Task
     * Deliver coroutine or async task
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     * @return bool|array
     * @throws TaskException
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//见下文Command章节

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');

        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            $result = $server->taskCo($tasks, $timeout);

            return $result;

        // Deliver async task
        return $server->task($data);
Task deliveryTask::deliver()Package the call parameters and pass them according to the $type parametersSwoole# The $server->taskCo() or $server->task() interface is delivered to the Task process.
Task itself is always executed synchronously, $type only affects the behavior of the delivery operation, Task::TYPE_ASYNC corresponds to $ server->task() is asynchronous delivery, Task::deliver() returns immediately after being called; Task::TYPE_CO corresponds to $server-> ;taskCo() is a coroutine delivery. After delivery, the coroutine control is given up. Task::deliver() will not return from the coroutine until the task is completed or the execution times out.

Task Execution

 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
class TaskEventListener implements TaskInterface, FinishInterface
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
    public function onTask(Server $server, int $taskId, int $workerId, $data)
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources

        return $result;
Here is the event callback of

swoole.onTask, its responsibility is only to package the Worker process delivered The data is forwarded to TaskExecutor. The essence of

Swoole's Task mechanism is that the Worker process delivers time-consuming tasks to the synchronized Task process( Also known as TaskWorker) processing, so the event callback of swoole.onTask is executed in the Task process. As mentioned above, the Worker process is the environment where most of your HTTP service code is executed, but starting from the TaskEventListener.onTask() method, the execution environment of the code They are all Task process, that is to say, TaskExecutor and the specific TaskBean are executed in the Task process.

 * The task executor
 * @Bean()
class TaskExecutor
     * @param string $data
     * @return mixed
    public function run(string $data)
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;

        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);

        return $result;
The task execution idea is very simple. Unpack the data sent by the

Worker process and restore it to the original calling parameters, and find the corresponding ## based on the $name parameters. #TaskBean and call its corresponding task() method. Among them, TaskBean is declared using the class-level annotation @Task(name="TaskName") or @Task("TaskName"). It is worth mentioning that the


annotation removes the name attribute, and there is also a coroutine attribute. The above code will be based on this parameter. Choose to use coroutine's runCoTask() or synchronization's runSyncTask() to execute Task. However, because the execution of Swoole's Task process is completely synchronous and does not support coroutines, please do not configure this parameter to true in the current version. Similarly, the task code written in TaskBean must be synchronously blocking or it must be able to automatically downgrade asynchronous non-blocking and coroutine to synchronous blocking according to the environment Deliver the task from Process

We mentioned earlier: The essence of


mechanism of

Swoole is that the Worker process delivers time-consuming tasks to Synchronous Task Process (aka TaskWorker) processing. In other words,

’s $server->taskCo() or $server->task() can only Used in Worker process. This restriction greatly limits the usage scenarios. How can I deliver tasks in Process
? SwoftIn order to bypass this limitation, the Task::deliverByProcess() method is provided. The implementation principle is also very simple. The call information is delivered from Process to the Worker process through the $server->sendMessage() method of Swoole , and then the Worker process delivers it to the Task process . The relevant code is as follows: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">//Swoft\Task\Task.php /**  * Deliver task by process  *  * @param string $taskName  * @param string $methodName  * @param array  $params  * @param string $type  * @param int    $timeout  * @param int    $workId  *  * @return bool  */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool {     /* @var PipeMessageInterface $pipeMessage */     $server      = App::$server-&gt;getServer();     $pipeMessage = App::getBean(PipeMessage::class);     $data = [         'name'    =&gt; $taskName,         'method'  =&gt; $methodName,         'params'  =&gt; $params,         'timeout' =&gt; $timeout,         'type'    =&gt; $type,     ];     $message = $pipeMessage-&gt;pack(PipeMessage::MESSAGE_TYPE_TASK, $data);     return $server-&gt;sendMessage($message, $workId); }</pre><div class="contentsignin">Copy after login</div></div>After the data is packaged, use


After delivering to Worker:<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">//Swoft\Bootstrap\Server\ServerTrait.php /**  * onPipeMessage event callback  *  * @param \Swoole\Server $server  * @param int            $srcWorkerId  * @param string         $message  * @return void  * @throws \InvalidArgumentException  */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) {     /* @var PipeMessageInterface $pipeMessage */     $pipeMessage = App::getBean(PipeMessage::class);     list($type, $data) = $pipeMessage-&gt;unpack($message);     App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }</pre><div class="contentsignin">Copy after login</div></div>


, the Worker process will trigger a ## when it receives the data. #swoole.pipeMessage event callback, Swoft will convert it into its own swoft.pipeMessage event and trigger.

 * The pipe message listener
 * @Listener(event=AppEvent::PIPE_MESSAGE)
class PipeMessageListener implements EventHandlerInterface
     * @param \Swoft\Event\EventInterface $event
    public function handle(EventInterface $event)
        $params = $event->getParams();
        if (count($params) <code></code>swoft. The pipeMessage event is ultimately handled by <p>PipeMessageListener<code>. In the relevant monitoring, if it is found that the </code>swoft.pipeMessage<code> event is generated by </code>Task::deliverByProcess()<code>, the </code>Worker process<code> will execute it once</code> Task::deliver()<code>, finally delivers the task data to the </code>TaskWorker process<code>. </code><code>A simple review exercise: from </code>Task::deliverByProcess()</p> to the final execution of a certain <p>TaskBean<code>, what processes have been experienced, and which parts of the call chain have In which processes are they executed? </code></p><h3>从Command进程或其子进程中投递任务</h3><pre class="brush:php;toolbar:false">//Swoft\Task\QueueTask.php
 * @param string $data
 * @param int    $taskWorkerId
 * @param int    $srcWorkerId
 * @return bool
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
    if ($taskWorkerId === null) {
        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);

    if ($srcWorkerId === null) {
        $srcWorkerId = mt_rand(0, $this->workerNum - 1);

    $data   = $this->pack($data, $srcWorkerId);
    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
    if (!$result) {
        return false;

    return true;
但在Swoft的体系中,还有一个十分路人的角色: Command

Analysis of Swoole and Swoft source code of Swoft

同一个项目中CommandHttp\RpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。




 * 任务表,记录用户配置的任务信息
 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
 * @var array $originStruct 
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳

 * 执行表,记录短时间内要执行的任务列表及其执行状态
 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
 * @var array $runTimeStruct 
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行)  1(已执行)  2(执行中) 三种。 
    //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
Swoft的的定时任务管理是分别由 任务计划进程任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指SwooleSwoole\Table结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。


 * Crontab timer process
 * @Process(name="cronTimer", boot=true)
class CronTimerProcess implements ProcessInterface
     * @param \Swoft\Process\Process $process
    public function run(SwoftProcess $process)
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $server->tick(60 * 1000, function () use ($cron) {
Copy after login
 * @param array $task        任务
 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
 * @return bool
private function initRunTimeTableData(array $task, array $parseResult): bool
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL

    return true;
该进程使用了Swoole的定时器功能,通过Swoole\Timer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。

 * Crontab process
 * @Process(name="cronExec", boot=true)
class CronExecProcess implements ProcessInterface
     * @param \Swoft\Process\Process $process
    public function run(SwoftProcess $process)
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
CronExecProcess作为定时任务的执行者,通过Swoole\Timer0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。

定时任务的宏观执行情况如下:Analysis of Swoole and Swoft source code of Swoft



The above is the detailed content of Analysis of Swoole and Swoft source code of Swoft.

