php实现多线程

原创
2016-06-13 11:23:01 560浏览

php实现多线程
服务器发送多个请求要实现多进程要方便很多。只能使用在cli模式。可以用在特殊场合,如邮件发送任务等。
资源的共享访问使用了文件锁,并不是很可靠,主要是为了能够在Windwos下使用,如果确实有必要可以考虑自己改用相应的信号灯机制(这个扩展只能用于xUNIX)。

实例
[php]
define('DIR_PHP_EXEC', 'php');
define('DIR_MAIN_EXEC', __FILE__);
define('DIR_TMP', '/tmp');
require_once('my_process.php');

class pp extends my_process_base {
public function run($param = null) {
for ($i = 0; $i echo "111 $paramn";
sleep(1);
}
}
}

init_my_process();
$obj = $GLOBALS['gal_obj_process_m'];
if ($obj->is_main()) {
$obj->run_task('pp', 'a');
$obj->run_task('pp', 'b');
$obj->run_task('pp', 'c');
$obj->run_task('pp', 'd');
//$obj->run_task('pp', 'b');
$obj->set_max_run(10);
$obj->run();
}

[/php]

进程管理类
[php]
/**
* @copyright 2007 movivi
* @author 徐智
*
* $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
*/
if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC', 'php');
//if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');
if (!defined('DIR_TMP')) define('DIR_TMP', '');
/*****************************************************************************/
/* 初始化 */
define('CMD_MAIN_PROCESS_KEY', 'main_process_key');
define('CMD_CHILD_PROCESS_NAME', 'child_process_name');
define('CMD_CHILD_PROCESS_PARAM', 'child_process_param');

function init_my_process() {
$GLOBALS['gal_obj_cmd'] = new my_cmd_argv();
$key = $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY);
$key = $key === false ? '' : $key;
$GLOBALS['gal_obj_process_m'] = new my_process_m($key);
if (!$GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run() ;
}

/**
* php多进程类
*
* 你需要从这个对象继承,然后实现你自己的run处理
*/
abstract class my_process_base {
public function __construct($auto_run=true, $name='') {
}

public function __destruct() {
echo "@endn";
}

abstract public function run($param = null);
}


class my_cmd_argv {
private $cmd_argv = array();
public function __construct() {
$argv = $_SERVER['argv'];
for ($i = 1; $i $cmd = explode('=', $argv[$i]);
$this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : '';
}
}

public function get_key($key) {
return isset($this->cmd_argv[$key]);
}

public function get_value($key) {
return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;
}
}

/**
* php多进程管理类
* 可以在PHP中实现多进程处理,只限在控制台方式使用
* 当前的信号实现机制采用文件方式
*
*/
class my_process_m {
/**
* @var array $task_list
* 进程列表
*/
private $task_list = array();
private $lock_list = array();
private $lock = null;
private $is_main = false;
private $max_run = 3600000;

private function release_lock($key = null) {
$lock = &$this->lock_list;
if (!is_null($key)) {
$key = md5($this->build_lock_id($key));
if (isset($lock[$key])) {
if (is_resource($lock[$key][0])) fclose($lock[$key][0]);
unlink($lock[$key][1]);
unset($lock[$key]);
}
return true;
}

foreach ($lock as $k => $h) {
if (is_resource($h)) fclose($h);
unset($lock[$k]);
}
return true;
}

private function release_task($key = null) {
$task = &$this->task_list;
if (!is_null($key) && isset($task[$key])) {
if (is_resource($task[$key])) pclose($task[$key]);
unset($task[$key]);
} else {
foreach ($task as $k => $h) {
if (is_resource($h)) pclose($h);
unset($task[$k]);
}
}
return true;
}

private function build_lock_id($key) {
return DIR_TMP . DIRECTORY_SEPARATOR . $key . '_sem.lock';
}

protected function run_child_process() {
$class = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME);
$param = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM);
$param = $param == '' ? null : unserialize(base64_decode(trim($param)));
$obj = new $class();
$obj->run($param);
$this->task_list[] = $obj;
}

public function __construct($lock='') {
if ($lock === '') {
$this->is_main = true;
$key = md5(uniqid()) . '_main.my_process';
$lock = array($key, $this->get($key));
} else {
$this->is_main = false;
$lock = array($lock, 0);
}
$this->lock = $lock;
}

public function __destruct() {
$this->release_lock();
$this->release_task();
}

/**
* 停止所有进程
*
*/
public function stop_all() {
}

/**
* 是否是主进程
*
*/
public function is_main() {
return $this->is_main;
}

/**
* 是不是已经存在一个活动信号
*
* @param string $key
* @return bool
*/
public function exist($key) {
return file_exists($this->build_lock_id($key));
}

/**
* 获取一个信号
*
* @param string $key
* @param int $max_acquire 最大请求阻塞数量
* @return mix 如果成功返回一个信号ID
*
*/
public function get($key, $max_acquire=5) {
$fn = $this->build_lock_id($key);
if (isset($this->lock_list[md5($fn)])) return false;
$id = fopen($fn, 'a+');
if ($id) $this->lock_list[md5($fn)] = array($id, $fn);
return $id;
}

/**
* 释放一个信号
*
* @param string $key
* @return bool 如果成功返回一个信号true
*
*/
public function remove($key) {
return $this->release_lock($key);
}

/**
* 获取一个信号
*
* @param string $id 信号ID
* @param bool $block 是否阻塞
*/
public function acquire($id, $block=false) {
if ($block) {
return flock($id, LOCK_EX);
} else {
return flock($id, LOCK_EX + LOCK_NB);
}
}

/**
* 释放一个信号
*
*/
public function release($id) {
flock($id, LOCK_UN);
}
public function run_task($process_name, $param=null) {
$this->task_list[] = popen(DIR_PHP_EXEC . ' -f ' . DIR_MAIN_EXEC . ' -- '
. CMD_CHILD_PROCESS_NAME . '=' . $process_name . ' '
. CMD_CHILD_PROCESS_PARAM . '="' . base64_encode(serialize($param)) . '" '
. CMD_MAIN_PROCESS_KEY . '="' . $this->lock[0] . '" ',
'r');
}

public function run($auto_run = true) {
if ($this->is_main) {
$ps = &$this->task_list;
$max_run = &$this->max_run;
$id = 0;
do {
//echo "process----------------------------------------: n";
$c = 0;
foreach ($ps as $k => $h) {
$c++;
$msg = fread($h, 8000);
if (substr($msg, -5, 4) === '@end') {
echo "end process:[$k][$id] echo n{$msg} n";
$this->release_task($k);
} else {
echo "process:[$k][$id] echo n{$msg} n";
}
}
sleep(1);
} while ($auto_run && $id++ 0);
} else {
$this->run_child_process();
}
}

public function set_max_run($max=1000) {
$this->max_run = $max;
}
}


声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。