php - popen如何实现多进程并发执行,循环里的pclose会等待进程完毕再进行下一次循环
ringa_lee
ringa_lee 2017-04-10 17:22:18
0
4
1583

1.PHP popen如何实现多进程并发执行,循环里的pclose会等待进程完毕再进行下一次循环

2.假设有17个进程要开启,如何实现每次启动5个进程,并且每完成一个进程就关闭一个进程,同时开启下一个进程,也就是说最多只有5个进程同时执行

//启动2个进程
for($i = 0;$i < 2;$i++){
    $command = "$phpPath $destPHPFile >> $logFile$i";
    echo "进程开启时间".date('Y-m-d H:i:s')."\n";
    $resource = popen($command,'r');
    if(is_resource($resource)){
        $success++;
        pclose($resource);//下一次循环会等待上一个进程执行完毕,pclose才会释放资源
        echo date('Y-m-d H:i:s')." 进程:".$i."启动完毕,执行完毕并关闭,开启下一个进程\n";
    }else{
        $failure++;
    }
}

这样的做法相当于每次启动一个进程,循环执行,相当于单进程处理任务,如何做到多进程

ringa_lee
ringa_lee

ringa_lee

répondre à tous(4)
刘奇

谢邀.

如果你们有RabbitMQ的构架和经验, 实现这个很方便
rabbitmq 消息的消费端 channel 可以设置 prefetch=5, 即最多同时处理5条消息
rabbitmq 还有完善的ack机制, 即消息回执(该消息已正确处理完毕, 给我下条消息吧)
而且, 不会堵塞你当前PHP的进程, 后台(消费worker)会以5个并发的情形, 慢慢处理完这些任务

简单的可从Redis的List入手, 堵塞时判断队列的长度, 小于5时才开始popen并追加到队列

test.php

$redis = new \Redis();
$redis->connect("127.0.0.1", 6379);

$queue_name = "myqueue";
$queue_prefetch = 5;
$redis->delete($queue_name); // delete fro reseting.
for($i=0; $i<17; $i++){
    while(1){
        $count = count($redis->lRange($queue_name, 0, -1));
        if($count <= $queue_prefetch) break;
        usleep(20); // 堵塞20ms, 视情况而定, 太低redis I/O 会比较频繁
    }
    $redis->rPush($queue_name, $i);
    $pid = pcntl_fork(); // 开启子进程, 需要pcntl模块
    if($pid){
        pclose(popen("/usr/bin/php /diandao/script.php $i $pid $queue_name", "w"));
        pcntl_wait($status);
    }
}

/diandao/script.php

<?php
sleep(1); // 模拟处理超过1秒

$index = $argv[1];
$pid = $argv[2];
$queue = $argv[3];
$redis = new Redis;
$redis->connect("127.0.0.1", 6379);

$f = fopen("/diandao/script.log", "a+");
$date = date("Y-m-d H:i:s");
fwrite($f, "[$date] ".json_encode($argv).PHP_EOL );
fclose($f);
$redis->lRem($queue, $index);

/diandao/script.log 结果

数量判断那多写了个=号, 所以并发变成了6个

迷茫

看到个帖子说到有个扩展可以实现子进程:Thread

大概描述了一下我的思路,这部分接触的不多,请多指教。

<?php
class Threads {
  protected $threads;
  protected $jobs;
  /**
   * Init Threads.
   *
   */
  public function __constructor($threadnum,$jobs)
  {
    $this->jobs = $jobs;
    for($i = 0;$i < $threadnum;$i++){
      $this->threads[$i] = new childThread();
    }
    $this->doJob();
  }
  /**
   * scan the thread.
   *
   * @return bool|@thread
   */
  public function scan()
  {
    if(count($this->jobs) <=0)
      return 'no jobs!';
    foreach($this->threads as $key => $obj)
    {
      if($obj->busy === false)
      {
        $idleThreads[] = $obj;
      }
    }
    if(count($idleThreads)>0)
    {
      $this->doJob($idleThreads);
      return false;
    }
    else
    { 
      return false;
    }
  }
  protected function doJob($idleThreads){
    foreach($idleThreads as $id => $thread){
      $thread->job = $this->getJob();
      $thread->start();
    }
  }
  protected function getJob(){
    $r = $this->jobs[0];
    unset($this->jobs[0]);
    return $r;
  }
  
}
class childThread extends Thread{
  public $job;
  public $busy=false;
  public function run(){
    //set busy=true
    $this->busy = true;
    //do job!

    //set busy=false
    $this->busy = false;
  }
}
//script:

$pool = new Threads(5,array(1,2,3...17));
while(1){
  if($pool->scan() == 'no jobs!'){
    exit();
  }else{
    sleep(1000);
  }
}
小葫芦

谢邀,这个感觉是进程池的概念吧。

Peter_Zhu

谢邀。

popen是异步的,不会出现阻塞啊。理论上不会等一个线程结束才开始另一个线程的。

Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal