• 技术文章 >后端开发 >php教程

    PHP模拟supervisor的进程管理

    藏色散人藏色散人2021-01-11 09:01:17转载2487

    推荐:《PHP视频教程

    前言

    模拟supervisor进程管理DEMO(简易实现)

    没错,是造轮子!目的在于学习!

    截图:
    模拟supervisor的进程管理

    php入门到就业线上直播课:进入学习

    在图中自己实现了一个Copy子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

    实现

    1、在主进程循环内启动子进程执行命令
    2、在web输入 127.0.0.1:7865 获取子进程状态
    3、socket接收请求消息,并且执行相应操作,返回web页面
    4、回收子进程,防止称为僵尸进程

    不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
    延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

    知识点

    代码实现的过程中,有很多的细节是值得学习的。
    1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
    关于阻塞非阻塞模式,可以参阅这里
    2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
    3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

    代码

    由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。

    主进程代码:Process.php

    <?php
    require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{
        /** 
         * 待启动的消费者数组
         */
        protected $consumers = array();
        protected $childPids = array();
    
        const PPID_FILE = __DIR__ . '/process';
        protected $serializerConsumer;
    
        public function __construct()
        {
            $this->consumers = $this->getConsumers();
        }
    
        // 这里是个DEMO,实际可以用读取配置文件的方式。
        public function getConsumers()
        {
            $consumer = new Consumer([
                'program' => 'test',
                'command' => '/usr/bin/php test.php',
                'directory' => __DIR__,
                'logfile' => __DIR__ . '/test.log',
                'uniqid' => uniqid(),
                'auto_restart' => false,
            ]);
            return [
                $consumer->uniqid => $consumer,
            ];
        }
    
        public function run()
        {
            if (empty($this->consumers)) {
                // consumer empty
                return;
            }
            if ($this->_notifyMaster()) {
                // master alive
                return;
            }
    
            $pid = pcntl_fork();
            if ($pid < 0) {
                exit;
            } elseif ($pid > 0) {
                exit;
            }
            if (!posix_setsid()) {
                exit;
            }
    
            $stream = new StreamConnection('tcp://0.0.0.0:7865');
            @cli_set_process_title('AMQP Master Process');
            // 将主进程ID写入文件
            file_put_contents(self::PPID_FILE, getmypid());
            // master进程继续
            while (true) {
                $this->init();
                pcntl_signal_dispatch();
                $this->waitpid();
                // 如果子进程被全部回收,则主进程退出
                // if (empty($this->childPids)) {
                //     $stream->close($stream->getSocket());
                //     break;
                // }
                $stream->accept(function ($uniqid, $action) {
                    $this->handle($uniqid, $action);
                    return $this->display();
                });
            }
        }
    
        protected function init()
        {
            foreach ($this->consumers as &$c) {
                switch ($c->state) {
                    case Consumer::RUNNING:
                    case Consumer::STOP:
                        break;
                    case Consumer::NOMINAL:
                    case Consumer::STARTING:
                        $this->fork($c);
                        break;
                    case Consumer::STOPING:
                        if ($c->pid && posix_kill($c->pid, SIGTERM)) {
                            $this->reset($c, Consumer::STOP);
                        }
                        break;
                    case Consumer::RESTART:
                        if (empty($c->pid)) {
                            $this->fork($c);
                            break;
                        }
                        if (posix_kill($c->pid, SIGTERM)) {
                            $this->reset($c, Consumer::STOP);
                            $this->fork($c);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    
        protected function reset(Consumer $c, $state)
        {
            $c->pid = '';
            $c->uptime = '';
            $c->state = $state;
            $c->process = null;
        }
    
        protected function waitpid()
        {
            foreach ($this->childPids as $uniqid => $pid) {
                $result = pcntl_waitpid($pid, $status, WNOHANG);
                if ($result == $pid || $result == -1) {
                    unset($this->childPids[$uniqid]);
                    $c = &$this->consumers[$uniqid];
                    $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
                    $this->reset($c, $state);
                }
            }
        }
    
    
        /**
         * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
         */
        private function _notifyMaster()
        {
            $ppid = file_get_contents(self::PPID_FILE );
            $isAlive = $this->checkProcessAlive($ppid);
            if (!$isAlive) return false;
            return true;
        }
    
        public function checkProcessAlive($pid)
        {
            if (empty($pid)) return false;
            $pidinfo = `ps co pid {$pid} | xargs`;
            $pidinfo = trim($pidinfo);
            $pattern = "/.*?PID.*?(\d+).*?/";
            preg_match($pattern, $pidinfo, $matches);
            return empty($matches) ? false : ($matches[1] == $pid ? true : false);
        }
    
        /**
         * fork一个新的子进程
         */
        protected function fork(Consumer $c)
        {
            $descriptorspec = [2 => ['file', $c->logfile, 'a'],];
            $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
            if ($process) {
                $ret = proc_get_status($process);
                if ($ret['running']) {
                    $c->state = Consumer::RUNNING;
                    $c->pid = $ret['pid'];
                    $c->process = $process;
                    $c->uptime = date('m-d H:i');
                    $this->childPids[$c->uniqid] = $ret['pid'];
                } else {
                    $c->state = Consumer::EXITED;
                    proc_close($process);
                }
            } else {
                $c->state = Consumer::ERROR;
            }
            return $c;
        }
    
        public function display()
        {
            $location = 'http://127.0.0.1:7865';
            $basePath = Http::$basePath;
            $scriptName = isset($_SERVER['SCRIPT_NAME']) &&
                !empty($_SERVER['SCRIPT_NAME']) &&
                $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
            if ($scriptName == '/index.html') {
                return Http::status_301($location);
            }
    
            $sourcePath = $basePath . $scriptName;
            if (!is_file($sourcePath)) {
                return Http::status_404();
            }
    
            ob_start();
            include $sourcePath;
            $response = ob_get_contents();
            ob_clean();
    
            return Http::status_200($response);
        }
    
        public function handle($uniqid, $action)
        {
            if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
                return;
            }
            switch ($action) {
                case 'refresh':
                    break;
                case 'restartall':
                    $this->killall(true);
                    break;
                case 'stopall':
                    $this->killall();
                    break;
                case 'stop':
                    $c = &$this->consumers[$uniqid];
                    if ($c->state != Consumer::RUNNING) break;
                    $c->state = Consumer::STOPING;
                    break;
                case 'start':
                    $c = &$this->consumers[$uniqid];
                    if ($c->state == Consumer::RUNNING) break;
                    $c->state = Consumer::STARTING;
                    break;
                case 'restart':
                    $c = &$this->consumers[$uniqid];
                    $c->state = Consumer::RESTART;
                    break;
                case 'copy':
                    $c = $this->consumers[$uniqid];
                    $newC = clone $c;
                    $newC->uniqid = uniqid('C');
                    $newC->state = Consumer::NOMINAL;
                    $newC->pid = '';
                    $this->consumers[$newC->uniqid] = $newC;
                    break;
                default:
                    break;
            }
        }
    
        protected function killall($restart = false)
        {
            foreach ($this->consumers as &$c) {
                $c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
            }
        }}$cli = new Process();$cli->run();

    Consumer消费者对象

    <?php
    require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{
        /** 开启多少个消费者 */
        public $numprocs = 1;
        /** 当前配置的唯一标志 */
        public $program;
        /** 执行的命令 */
        public $command;
        /** 当前工作的目录 */
        public $directory;
    
        /** 通过 $qos $queueName $duplicate 生成的 $queue */
        public $queue;
        /** 程序执行日志记录 */
        public $logfile = '';
        /** 消费进程的唯一ID */
        public $uniqid;
        /** 进程IDpid */
        public $pid;
        /** 进程状态 */
        public $state = self::NOMINAL;
        /** 自启动 */
        public $auto_restart = false;
    
        public $process;
        /** 启动时间 */
        public $uptime;
    
        const RUNNING = 'running';
        const STOP = 'stoped';
        const NOMINAL = 'nominal';
        const RESTART = 'restart';
        const STOPING = 'stoping';
        const STARTING = 'stating';
        const ERROR = 'error';
        const BLOCKED = 'blocked';
        const EXITED = 'exited';
        const FATEL = 'fatel';}

    stream相关代码:StreamConnection.php

    <?php
    class StreamConnection{
        protected $socket;
        protected $timeout = 2; //s
        protected $client;
    
        public function __construct($host)
        {
            $this->socket = $this->connect($host);
        }
    
        public function connect($host)
        {
            $socket = stream_socket_server($host, $errno, $errstr);
            if (!$socket) {
                exit('stream error');
            }
            stream_set_timeout($socket, $this->timeout);
            stream_set_chunk_size($socket, 1024);
            stream_set_blocking($socket, false);
            $this->client = [$socket];
            return $socket;
        }
    
        public function accept(Closure $callback)
        {
            $read = $this->client;
            if (stream_select($read, $write, $except, 1) < 1) return;
            if (in_array($this->socket, $read)) {
                $cs = stream_socket_accept($this->socket);
                $this->client[] = $cs;
            }
            foreach ($read as $s) {
                if ($s == $this->socket) continue;
                $header = fread($s, 1024);
                if (empty($header)) {
                    $index = array_search($s, $this->client);
                    if ($index)
                        unset($this->client[$index]);
                    $this->close($s);
                    continue;
                }
                Http::parse_http($header);
                $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
                $action = isset($_GET['action']) ? $_GET['action'] : '';
                $response = $callback($uniqid, $action);
                $this->write($s, $response);
                $index = array_search($s, $this->client);
                if ($index)
                    unset($this->client[$index]);
                $this->close($s);
            }
        }
    
        public function write($socket, $response)
        {
            $ret = fwrite($socket, $response, strlen($response));
        }
    
        public function close($socket)
        {
            $flag = fclose($socket);
        }
    
        public function getSocket()
        {
            return $this->socket;
        }}

    Http响应代码:Http.php

    <?php
    class Http{
    
        public static $basePath = __DIR__ . '/views';
        public static $max_age = 120; //秒
    
        /*
        *  函数:     parse_http
        *  描述:     解析http协议
        */
        public static function parse_http($http)
        {
            // 初始化
            $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
            $GLOBALS['HTTP_RAW_POST_DATA'] = '';
            // 需要设置的变量名
            $_SERVER = array(
                'QUERY_STRING' => '',
                'REQUEST_METHOD' => '',
                'REQUEST_URI' => '',
                'SERVER_PROTOCOL' => '',
                'SERVER_SOFTWARE' => '',
                'SERVER_NAME' => '',
                'HTTP_HOST' => '',
                'HTTP_USER_AGENT' => '',
                'HTTP_ACCEPT' => '',
                'HTTP_ACCEPT_LANGUAGE' => '',
                'HTTP_ACCEPT_ENCODING' => '',
                'HTTP_COOKIE' => '',
                'HTTP_CONNECTION' => '',
                'REMOTE_ADDR' => '',
                'REMOTE_PORT' => '0',
                'SCRIPT_NAME' => '',
                'HTTP_REFERER' => '',
                'CONTENT_TYPE' => '',
                'HTTP_IF_NONE_MATCH' => '',
            );
    
            // 将header分割成数组
            list($http_header, $http_body) = explode("\r\n\r\n", $http, 2);
            $header_data = explode("\r\n", $http_header);
    
            list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);
    
            unset($header_data[0]);
            foreach ($header_data as $content) {
                // \r\n\r\n
                if (empty($content)) {
                    continue;
                }
                list($key, $value) = explode(':', $content, 2);
                $key = strtolower($key);
                $value = trim($value);
                switch ($key) {
                    case 'host':
                        $_SERVER['HTTP_HOST'] = $value;
                        $tmp = explode(':', $value);
                        $_SERVER['SERVER_NAME'] = $tmp[0];
                        if (isset($tmp[1])) {
                            $_SERVER['SERVER_PORT'] = $tmp[1];
                        }
                        break;
                    case 'cookie':
                        $_SERVER['HTTP_COOKIE'] = $value;
                        parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
                        break;
                    case 'user-agent':
                        $_SERVER['HTTP_USER_AGENT'] = $value;
                        break;
                    case 'accept':
                        $_SERVER['HTTP_ACCEPT'] = $value;
                        break;
                    case 'accept-language':
                        $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
                        break;
                    case 'accept-encoding':
                        $_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
                        break;
                    case 'connection':
                        $_SERVER['HTTP_CONNECTION'] = $value;
                        break;
                    case 'referer':
                        $_SERVER['HTTP_REFERER'] = $value;
                        break;
                    case 'if-modified-since':
                        $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
                        break;
                    case 'if-none-match':
                        $_SERVER['HTTP_IF_NONE_MATCH'] = $value;
                        break;
                    case 'content-type':
                        if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {
                            $_SERVER['CONTENT_TYPE'] = $value;
                        } else {
                            $_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
                            $http_post_boundary = '--' . $match[1];
                        }
                        break;
                }
            }
    
            // script_name
            $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);
    
            // QUERY_STRING
            $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
            if ($_SERVER['QUERY_STRING']) {
                // $GET
                parse_str($_SERVER['QUERY_STRING'], $_GET);
            } else {
                $_SERVER['QUERY_STRING'] = '';
            }
    
            // REQUEST
            $_REQUEST = array_merge($_GET, $_POST);
    
            return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
        }
    
        public static function status_404()
        {
            return <<<EOFHTTP/1.1 404 OK
    content-type: text/htmlEOF;
        }
    
        public static function status_301($location)
        {
            return <<<EOFHTTP/1.1 301 Moved Permanently
    Content-Length: 0
    Content-Type: text/plain
    Location: $locationCache-Control: no-cacheEOF;
        }
    
        public static function status_304()
        {
            return <<<EOFHTTP/1.1 304 Not Modified
    Content-Length: 0EOF;
        }
    
        public static function status_200($response)
        {
            $contentType = $_SERVER['CONTENT_TYPE'];
            $length = strlen($response);
            $header = '';
            if ($contentType)
                $header = 'Cache-Control: max-age=180';
            return <<<EOFHTTP/1.1 200 OK
    Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
        }}

    待执行的脚本:test.php

    <?php
    while(true) {
        file_put_contents(__DIR__  .  '/test.log', date('Y-m-d H:i:s'));
        sleep(1);}

    在当前目录下的视图页面:
    |- Process.php
    |- Http.php
    |- StreamConnection.php
    |- Consumer.php
    |- BaseObject.php
    |- views/

    更多编程相关知识,请访问:编程教学!!

    以上就是PHP模拟supervisor的进程管理的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:learnku,如有侵犯,请联系admin@php.cn删除

    前端(VUE)零基础到就业课程:点击学习

    清晰的学习路线+老师随时辅导答疑

    自己动手写 PHP MVC 框架:点击学习

    快速了解MVC架构、了解框架底层运行原理

    专题推荐:php
    上一篇:5种PHP定义数组的方法 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • ❤️‍🔥共22门课程,总价3725元,会员免费学• ❤️‍🔥接口自动化测试不想写代码?• linux怎么查看进程?• runtime broker是什么进程• macos系统查看端口和杀死进程的命令是什么• 如何查看linux端口被哪个进程占用
    1/1

    PHP中文网