php多线程实现多进程 跨平台的一例代码

原创
2016-07-25 08:58:40 793浏览
  1. define('DIR_PHP_EXEC', 'php');

  2. define('DIR_MAIN_EXEC', __FILE__);
  3. define('DIR_TMP', '/tmp');
  4. require_once('my_process.php');

  5. class pp extends my_process_base {

  6. public function run($param = null) {
  7. for ($i = 0; $i < 4; $i++) {
  8. echo "111 $param\n";
  9. sleep(1);
  10. }
  11. }
  12. }

  13. init_my_process();

  14. $obj = $GLOBALS['gal_obj_process_m'];
  15. if ($obj->is_main()) {
  16. $obj->run_task('pp', 'a');
  17. $obj->run_task('pp', 'b');
  18. $obj->run_task('pp', 'c');
  19. $obj->run_task('pp', 'd');
  20. //$obj->run_task('pp', 'b');
  21. $obj->set_max_run(10);
  22. $obj->run();
  23. }?>

复制代码

2,进程管理类

  1. /**

  2. * @copyright 2007 movivi
  3. * @author 徐智 *
  4. * $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
  5. * modify by bbs.it-home.org
  6. */
  7. if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC', 'php');
  8. //if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');
  9. if (!defined('DIR_TMP')) define('DIR_TMP', '');
  10. /********/
  11. /* 初始化 */
  12. define('CMD_MAIN_PROCESS_KEY', 'main_process_key');
  13. define('CMD_CHILD_PROCESS_NAME', 'child_process_name');
  14. define('CMD_CHILD_PROCESS_PARAM', 'child_process_param');

  15. function init_my_process() {

  16. $GLOBALS['gal_obj_cmd'] = new my_cmd_argv();
  17. $key = $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY);
  18. $key = $key === false ? '' : $key;
  19. $GLOBALS['gal_obj_process_m'] = new my_process_m($key);
  20. if (!$GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run() ;
  21. }

  22. /**

  23. * php多进程类
  24. *
  25. * 你需要从这个对象继承,然后实现你自己的run处理
  26. */
  27. abstract class my_process_base {
  28. public function __construct($auto_run=true, $name='') {
  29. }

  30. public function __destruct() {

  31. echo "@end\n";
  32. }

  33. abstract public function run($param = null);

  34. }

  35. class my_cmd_argv {
  36. private $cmd_argv = array();
  37. public function __construct() {
  38. $argv = $_SERVER['argv'];
  39. for ($i = 1; $i < count($argv); $i++) {
  40. $cmd = explode('=', $argv[$i]);
  41. $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : '';
  42. }
  43. }

  44. public function get_key($key) {

  45. return isset($this->cmd_argv[$key]);
  46. }

  47. public function get_value($key) {

  48. return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;
  49. }
  50. }

  51. /**

  52. * php多进程管理类
  53. * 可以在PHP中实现多进程处理,只限在控制台方式使用
  54. * 当前的信号实现机制采用文件方式
  55. *
  56. */
  57. class my_process_m {
  58. /**
  59. * @var array $task_list
  60. * 进程列表
  61. */
  62. private $task_list = array();
  63. private $lock_list = array();
  64. private $lock = null;
  65. private $is_main = false;
  66. private $max_run = 3600000;

  67. private function release_lock($key = null) {

  68. $lock = &$this->lock_list;
  69. if (!is_null($key)) {
  70. $key = md5($this->build_lock_id($key));
  71. if (isset($lock[$key])) {
  72. if (is_resource($lock[$key][0])) fclose($lock[$key][0]);
  73. unlink($lock[$key][1]);
  74. unset($lock[$key]);
  75. }
  76. return true;
  77. }

  78. foreach ($lock as $k => $h) {

  79. if (is_resource($h)) fclose($h);
  80. unset($lock[$k]);
  81. }
  82. return true;
  83. }

  84. private function release_task($key = null) {

  85. $task = &$this->task_list;
  86. if (!is_null($key) && isset($task[$key])) {
  87. if (is_resource($task[$key])) pclose($task[$key]);
  88. unset($task[$key]);
  89. } else {
  90. foreach ($task as $k => $h) {
  91. if (is_resource($h)) pclose($h);
  92. unset($task[$k]);
  93. }
  94. }
  95. return true;
  96. }
  97. private function build_lock_id($key) {
  98. return DIR_TMP . DIRECTORY_SEPARATOR . $key . '_sem.lock';
  99. }

  100. protected function run_child_process() {

  101. $class = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME);
  102. $param = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM);
  103. $param = $param == '' ? null : unserialize(base64_decode(trim($param)));
  104. $obj = new $class();
  105. $obj->run($param);
  106. $this->task_list[] = $obj;
  107. }

  108. public function __construct($lock='') {

  109. if ($lock === '') {
  110. $this->is_main = true;
  111. $key = md5(uniqid()) . '_main.my_process';
  112. $lock = array($key, $this->get($key));
  113. } else {
  114. $this->is_main = false;
  115. $lock = array($lock, 0);
  116. }
  117. $this->lock = $lock;
  118. }

  119. public function __destruct() {

  120. $this->release_lock();
  121. $this->release_task();
  122. }

  123. /**

  124. * 停止所有进程
  125. *
  126. */
  127. public function stop_all() {
  128. }

  129. /**

  130. * 是否是主进程
  131. *
  132. */
  133. public function is_main() {
  134. return $this->is_main;
  135. }

  136. /**

  137. * 是不是已经存在一个活动信号
  138. *
  139. * @param string $key
  140. * @return bool
  141. */
  142. public function exist($key) {
  143. return file_exists($this->build_lock_id($key));
  144. }

  145. /**

  146. * 获取一个信号
  147. *
  148. * @param string $key
  149. * @param int $max_acquire 最大请求阻塞数量
  150. * @return mix 如果成功返回一个信号ID
  151. *
  152. */
  153. public function get($key, $max_acquire=5) {
  154. $fn = $this->build_lock_id($key);
  155. if (isset($this->lock_list[md5($fn)])) return false;
  156. $id = fopen($fn, 'a+');
  157. if ($id) $this->lock_list[md5($fn)] = array($id, $fn);
  158. return $id;
  159. }

  160. /**

  161. * 释放一个信号
  162. *
  163. * @param string $key
  164. * @return bool 如果成功返回一个信号true
  165. *
  166. */
  167. public function remove($key) {
  168. return $this->release_lock($key);
  169. }

  170. /**

  171. * 获取一个信号
  172. *
  173. * @param string $id 信号ID
  174. * @param bool $block 是否阻塞
  175. */
  176. public function acquire($id, $block=false) {
  177. if ($block) {
  178. return flock($id, LOCK_EX);
  179. } else {
  180. return flock($id, LOCK_EX + LOCK_NB);
  181. }
  182. }

  183. /**

  184. * 释放一个信号
  185. *
  186. */
  187. public function release($id) {
  188. flock($id, LOCK_UN);
  189. }
  190. public function run_task($process_name, $param=null) {
  191. $this->task_list[] = popen(DIR_PHP_EXEC . ' -f ' . DIR_MAIN_EXEC . ' -- '
  192. . CMD_CHILD_PROCESS_NAME . '=' . $process_name . ' '
  193. . CMD_CHILD_PROCESS_PARAM . '="' . base64_encode(serialize($param)) . '" '
  194. . CMD_MAIN_PROCESS_KEY . '="' . $this->lock[0] . '" ',
  195. 'r');
  196. }

  197. public function run($auto_run = true) {

  198. if ($this->is_main) {
  199. $ps = &$this->task_list;
  200. $max_run = &$this->max_run;
  201. $id = 0;
  202. do {
  203. //echo "process-------------: \n";
  204. $c = 0;
  205. foreach ($ps as $k => $h) {
  206. $c++;
  207. $msg = fread($h, 8000);
  208. if (substr($msg, -5, 4) === '@end') {
  209. echo "end process:[$k][$id] echo \n{$msg} \n";
  210. $this->release_task($k);
  211. } else {
  212. echo "process:[$k][$id] echo \n{$msg} \n";
  213. }
  214. }
  215. sleep(1);
  216. } while ($auto_run && $id++ < $max_run && $c > 0);
  217. } else {
  218. $this->run_child_process();
  219. }
  220. }

  221. public function set_max_run($max=1000) {

  222. $this->max_run = $max;
  223. }
  224. }
  225. ?>

复制代码


声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
上一条:计算php时间相差多少天,多少小时的代码 下一条:php遍历数组的几种方法(for foreach list each while)

相关文章

查看更多