Home >PHP Framework >Swoole >Take you to learn swoole_process

Take you to learn swoole_process

coldplay.xixi
coldplay.xixiforward
2021-02-25 09:51:531590browse

Take you to learn swoole_process

Recommended (free): swoole

There are two communication methods between swoole processes, one is Message queue (queue), the other is pipe (pipe), the research on swoole_process is particularly important in swoole.

Preliminary knowledge

IO multiplexing

The io multiplexing in swoole is represented by the underlying epoll process model, which is represented by the epoll function in C language.

The epoll model will continue to monitor the socket descriptor fd under its own name
When an event monitored by the socket is triggered, the epoll function will respond and return a collection of all sockets listening at that time
The essence of epoll is blocking IO, its advantage is that it can handle a large number of socket connections at the same time
Event loop Event loop

swoole implements a Reactor thread model encapsulation of epoll, and sets up read events and write events The listening callback function. (See swoole_event_add for details)

Event loop is a Reactor thread in which an epoll instance is running.
Add an event of the socket descriptor to the epoll listener through swoole_event_add, and the callback function will be executed when the event occurs
It cannot be used in the fpm environment, because fpm may shut down the process at the end of the task.

swoole_process

A process management module encapsulated in C language, convenient for PHP to call
Built-in pipes and message queue interfaces to facilitate inter-process communication
We are in php-fpm. conf configuration file found that there are two process pool management settings in php-fpm.

Static mode is to initialize a fixed number of processes. When a request comes, one process is selected to handle it.
Dynamic mode specifies the minimum and maximum number of processes. When the request volume is too large and the number of processes does not exceed the maximum limit, a new thread is added to process the request.

Next, use swoole code to implement it. This is just for understanding. swoole_process, inter-process communication, timers, etc. In actual situations, it is more convenient to use the encapsulated swoole_server to implement the task queue pool.

If there is a task queue for scheduled delivery:

<?php/**
 * 动态进程池,类似fpm
 * 动态新建进程
 * 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
 */// 一个进程定时投递任务/**
 * 1. tick
 * 2. process及其管道通讯
 * 3. event loop 事件循环
 */class processPool{
  private $pool;

  /**
   * @var swoole_process[] 记录所有worker的process对象
   */
  private $workers = [];

  /**
   * @var array 记录worker工作状态
   */
  private $used_workers = [];

  /**
   * @var int 最小进程数
   */
  private $min_woker_num = 5;

  /**
   * @var int 初始进程数
   */
  private $start_worker_num = 10;

  /**
   * @var int 最大进程数
   */
  private $max_woker_num = 20;

  /**
   * 进程闲置销毁秒数
   * @var int
   */
  private $idle_seconds = 5;

  /**
   * @var int 当前进程数
   */
  private $curr_num;

  /**
   * 闲置进程时间戳
   * @var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // 循环建立worker进程
      for ($i = 0; $i < $this->start_worker_num; $i++) {
        $this->createWorker();
      }
      echo &#39;初始化进程数:&#39; . $this->curr_num . PHP_EOL;
      // 每秒定时往闲置的worker的管道中投递任务
      swoole_timer_tick(1000, function ($timer_id) {
        static $count = 0;
        $count++;
        $need_create = true;
        foreach ($this->used_workers as $pid => $used) {
          if ($used == 0) {
            $need_create = false;
            $this->workers[$pid]->write($count . &#39; job&#39;);
            // 标记使用中
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break;
          }
        }
        foreach ($this->used_workers as $pid => $used)
          // 如果所有worker队列都没有闲置的,则新建一个worker来处理
          if ($need_create && $this->curr_num < $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . &#39; job&#39;);
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // 闲置超过一段时间则销毁进程
        foreach ($this->active_time as $pid => $timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
            // 销毁该进程
            if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write(&#39;exit&#39;);
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break;
            }
          }
        }

        echo "任务{$count}/{$this->curr_num}\n";

        if ($count == 20) {
          foreach ($this->workers as $pid => $worker) {
            $worker->write(&#39;exit&#39;);
          }
          // 关闭定时器
          swoole_timer_clear($timer_id);
          // 退出进程池
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret[&#39;pid&#39;];
      echo "process {$pid} existed\n";
    }
  }

  /**
   * 创建一个新进程
   * @return int 新进程的pid
   */
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // 给子进程管道绑定事件
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data == &#39;exit&#39;) {
          $worker->exit(0);
          exit();
        }
        echo "{$worker->pid} 正在处理 {$data}\n";
        sleep(5);
        // 返回结果,表示空闲
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // 给父进程管道绑定事件
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data == &#39;complete&#39;) {
        // 标记为空闲//        echo "{$worker_process->pid} 空闲了\n";
        $this->used_workers[$worker_process->pid] = 0;
      }
    });

    // 保存process对象
    $this->workers[$worker_pid] = $worker_process;
    // 标记为空闲
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid;
  }}new processPool();

I hope the above content will help everyone

The above is the detailed content of Take you to learn swoole_process. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:csdn.net. If there is any infringement, please contact admin@php.cn delete