• 技术文章 >后端开发 >php教程

    php pthreads多线程的安装与使用_PHP

    2016-05-28 11:49:07原创464

    安装Pthreads 基本上需要重新编译PHP,加上 --enable-maintainer-zts 参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等

    一、安装

    这里使用的是 php-7.0.2

    ./configure \
    --prefix=/usr/local/php7 \
    --with-config-file-path=/etc \
    --with-config-file-scan-dir=/etc/php.d \
    --enable-debug \
    --enable-maintainer-zts \
    --enable-pcntl \
    --enable-fpm \
    --enable-opcache \
    --enable-embed=shared \
    --enable-json=shared \
    --enable-phpdbg \
    --with-curl=shared \
    --with-mysql=/usr/local/mysql \
    --with-mysqli=/usr/local/mysql/bin/mysql_config \
    --with-pdo-mysql

    make && make install

    安装pthreads

    pecl install pthreads

    二、Thread

    <?php
    #1
    $thread = new class extends Thread {
    public function run() {
    echo "Hello World {$this->getThreadId()}\n"; 
    } 
    };
    $thread->start() && $thread->join();
    #2
    class workerThread extends Thread { 
    public function __construct($i){
    $this->i=$i;
    }
    public function run(){
    while(true){
    echo $this->i."\n";
    sleep(1);
    } 
    } 
    }
    for($i=0;$i<50;$i++){
    $workers[$i]=new workerThread($i);
    $workers[$i]->start();
    }
    ?>

    三、 Worker 与 Stackable

    Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

    <?php
    class SQLQuery extends Stackable {
    public function __construct($sql) {
    $this->sql = $sql;
    }
    public function run() {
    $dbh = $this->worker->getConnection();
    $row = $dbh->query($this->sql);
    while($member = $row->fetch(PDO::FETCH_ASSOC)){
    print_r($member);
    }
    }
    }
    class ExampleWorker extends Worker {
    public static $dbh;
    public function __construct($name) {
    }
    public function run(){
    self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');
    }
    public function getConnection(){
    return self::$dbh;
    }
    }
    $worker = new ExampleWorker("My Worker Thread");
    $sql1 = new SQLQuery('select * from test order by id desc limit 1,5');
    $worker->stack($sql1);
    $sql2 = new SQLQuery('select * from test order by id desc limit 5,5');
    $worker->stack($sql2);
    $worker->start();
    $worker->shutdown();
    ?>

    四、 互斥锁

    什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同

    <?php
    $counter = 0;
    $handle=fopen("/tmp/counter.txt", "w");
    fwrite($handle, $counter );
    fclose($handle);
    class CounterThread extends Thread {
    public function __construct($mutex = null){
    $this->mutex = $mutex;
    $this->handle = fopen("/tmp/counter.txt", "w+");
    }
    public function __destruct(){
    fclose($this->handle);
    }
    public function run() {
    if($this->mutex)
    $locked=Mutex::lock($this->mutex);
    $counter = intval(fgets($this->handle));
    $counter++;
    rewind($this->handle);
    fputs($this->handle, $counter );
    printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    if($this->mutex)
    Mutex::unlock($this->mutex);
    }
    }
    //没有互斥锁
    for ($i=0;$i<50;$i++){
    $threads[$i] = new CounterThread();
    $threads[$i]->start();
    }
    //加入互斥锁
    $mutex = Mutex::create(true);
    for ($i=0;$i<50;$i++){
    $threads[$i] = new CounterThread($mutex);
    $threads[$i]->start();
    }
    Mutex::unlock($mutex);
    for ($i=0;$i<50;$i++){
    $threads[$i]->join();
    }
    Mutex::destroy($mutex);
    ?>

    多线程与共享内存

    在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能

    <?php
    $tmp = tempnam(__FILE__, 'PHP');
    $key = ftok($tmp, 'a');
    $shmid = shm_attach($key);
    $counter = 0;
    shm_put_var( $shmid, 1, $counter );
    class CounterThread extends Thread {
    public function __construct($shmid){
    $this->shmid = $shmid;
    }
    public function run() {
    $counter = shm_get_var( $this->shmid, 1 );
    $counter++;
    shm_put_var( $this->shmid, 1, $counter );
    printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
    }
    for ($i=0;$i<100;$i++){
    $threads[] = new CounterThread($shmid);
    }
    for ($i=0;$i<100;$i++){
    $threads[$i]->start();
    }
    for ($i=0;$i<100;$i++){
    $threads[$i]->join();
    }
    shm_remove( $shmid );
    shm_detach( $shmid );
    ?>

    五、 线程同步

    有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。thread−>wait();测作用是thread−>start()后线程并不会立即运行,只有收到 thread->notify(); 发出的信号后才运行

    <?php
    $tmp = tempnam(__FILE__, 'PHP');
    $key = ftok($tmp, 'a');
    $shmid = shm_attach($key);
    $counter = 0;
    shm_put_var( $shmid, 1, $counter );
    class CounterThread extends Thread {
    public function __construct($shmid){
    $this->shmid = $shmid;
    }
    public function run() {
    $this->synchronized(function($thread){
    $thread->wait();
    }, $this);
    $counter = shm_get_var( $this->shmid, 1 );
    $counter++;
    shm_put_var( $this->shmid, 1, $counter );
    printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
    }
    for ($i=0;$i<100;$i++){
    $threads[] = new CounterThread($shmid);
    }
    for ($i=0;$i<100;$i++){
    $threads[$i]->start();
    }
    for ($i=0;$i<100;$i++){
    $threads[$i]->synchronized(function($thread){
    $thread->notify();
    }, $threads[$i]);
    }
    for ($i=0;$i<100;$i++){
    $threads[$i]->join();
    }
    shm_remove( $shmid );
    shm_detach( $shmid );
    ?> 

    六、线程池

    一个Pool类

    <?php
    class Update extends Thread {
    public $running = false;
    public $row = array();
    public function __construct($row) {
    $this->row = $row;
    $this->sql = null;
    }
    public function run() {
    if(strlen($this->row['bankno']) > 100 ){
    $bankno = safenet_decrypt($this->row['bankno']);
    }else{
    $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
    file_put_contents("bankno_error.log", $error, FILE_APPEND);
    }
    if( strlen($bankno) > 7 ){
    $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);
    $this->sql = $sql;
    }
    printf("%s\n",$this->sql);
    }
    }
    class Pool {
    public $pool = array();
    public function __construct($count) {
    $this->count = $count;
    }
    public function push($row){
    if(count($this->pool) < $this->count){
    $this->pool[] = new Update($row);
    return true;
    }else{
    return false;
    }
    }
    public function start(){
    foreach ( $this->pool as $id => $worker){
    $this->pool[$id]->start();
    }
    }
    public function join(){
    foreach ( $this->pool as $id => $worker){
    $this->pool[$id]->join();
    }
    }
    public function clean(){
    foreach ( $this->pool as $id => $worker){
    if(! $worker->isRunning()){
    unset($this->pool[$id]);
    }
    }
    }
    }
    try {
    $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
    PDO::MYSQL_ATTR_COMPRESS => true
    )
    );
    $sql = "select id,bankno from members order by id desc";
    $row = $dbh->query($sql);
    $pool = new Pool(5);
    while($member = $row->fetch(PDO::FETCH_ASSOC))
    {
    while(true){
    if($pool->push($member)){ //压入任务到池中
    break;
    }else{ //如果池已经满,就开始启动线程
    $pool->start();
    $pool->join();
    $pool->clean();
    }
    }
    }
    $pool->start();
    $pool->join();
    $dbh = null;
    } catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";
    }
    ?>

    动态队列线程池

    上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

    <?php
    class Update extends Thread {
    public $running = false;
    public $row = array();
    public function __construct($row) {
    $this->row = $row;
    $this->sql = null;
    //print_r($this->row);
    }
    public function run() {
    if(strlen($this->row['bankno']) > 100 ){
    $bankno = safenet_decrypt($this->row['bankno']);
    }else{
    $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
    file_put_contents("bankno_error.log", $error, FILE_APPEND);
    }
    if( strlen($bankno) > 7 ){
    $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);
    $this->sql = $sql;
    }
    printf("%s\n",$this->sql);
    }
    }
    try {
    $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
    PDO::MYSQL_ATTR_COMPRESS => true
    )
    );
    $sql = "select id,bankno from members order by id desc limit 50";
    $row = $dbh->query($sql);
    $pool = array();
    while($member = $row->fetch(PDO::FETCH_ASSOC))
    {
    $id = $member['id'];
    while (true){
    if(count($pool) < 5){
    $pool[$id] = new Update($member);
    $pool[$id]->start();
    break;
    }else{
    foreach ( $pool as $name => $worker){
    if(! $worker->isRunning()){
    unset($pool[$name]);
    }
    }
    }
    }
    }
    $dbh = null;
    } catch (Exception $e) {
    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";
    }
    ?>

    pthreads Pool类

    <?php
    class WebWorker extends Worker {
    public function __construct(SafeLog $logger) {
    $this->logger = $logger;
    }
    protected $loger;
    }
    class WebWork extends Stackable {
    public function isComplete() {
    return $this->complete;
    }
    public function run() {
    $this->worker
    ->logger
    ->log("%s executing in Thread #%lu",
    __CLASS__, $this->worker->getThreadId());
    $this->complete = true;
    }
    protected $complete;
    }
    class SafeLog extends Stackable {
    protected function log($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
    echo vsprintf(
    "{$message}\n", $args);
    }
    }
    }
    $pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
    $pool->submit($w=new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->shutdown();
    $pool->collect(function($work){
    return $work->isComplete();
    });
    var_dump($pool); 

    七、多线程文件安全读写

    LOCK_SH 取得共享锁定(读取的程序)

    LOCK_EX 取得独占锁定(写入的程序

    LOCK_UN 释放锁定(无论共享或独占)

    LOCK_NB 如果不希望 flock() 在锁定时堵塞

    <?php
    $fp = fopen("/tmp/lock.txt", "r+");
    if (flock($fp, LOCK_EX)) { // 进行排它型锁定
    ftruncate($fp, 0); // truncate file
    fwrite($fp, "Write something here\n");
    fflush($fp); // flush output before releasing the lock
    flock($fp, LOCK_UN); // 释放锁定
    } else {
    echo "Couldn't get the lock!";
    }
    fclose($fp);
    $fp = fopen('/tmp/lock.txt', 'r+');
    if(!flock($fp, LOCK_EX | LOCK_NB)) {
    echo 'Unable to obtain lock';
    exit(-1);
    }
    fclose($fp);
    ?>

    八、多线程与数据连接

    pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

    Worker 与 PDO

    <?php
    class Work extends Stackable {
    public function __construct() {
    }
    public function run() {
    $dbh = $this->worker->getConnection();
    $sql = "select id,name from members order by id desc limit ";
    $row = $dbh->query($sql);
    while($member = $row->fetch(PDO::FETCH_ASSOC)){
    print_r($member);
    }
    }
    }
    class ExampleWorker extends Worker {
    public static $dbh;
    public function __construct($name) {
    }
    /*
    * The run method should just prepare the environment for the work that is coming ...
    */
    public function run(){
    self::$dbh = new PDO('mysql:host=...;dbname=example','www','');
    }
    public function getConnection(){
    return self::$dbh;
    }
    }
    $worker = new ExampleWorker("My Worker Thread");
    $work=new Work();
    $worker->stack($work);
    $worker->start();
    $worker->shutdown();
    ?> 

    Pool 与 PDO

    在线程池中链接数据库

    # cat pool.php
    <?php
    class ExampleWorker extends Worker {
    public function __construct(Logging $logger) {
    $this->logger = $logger;
    }
    protected $logger;
    }
    /* the collectable class implements machinery for Pool::collect */
    class Work extends Stackable {
    public function __construct($number) {
    $this->number = $number;
    }
    public function run() {
    $dbhost = 'db.example.com'; // 数据库服务器
    $dbuser = 'example.com'; // 数据库用户名
    $dbpw = 'password'; // 数据库密码
    $dbname = 'example_real';
    $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',
    PDO::MYSQL_ATTR_COMPRESS => true,
    PDO::ATTR_PERSISTENT => true
    )
    );
    $sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN='".$this->number['name']."' and CMD='' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";
    #echo $sql;
    $row = $dbh->query($sql);
    $mt_trades = $row->fetch(PDO::FETCH_ASSOC);
    if($mt_trades){
    $row = null;
    $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";
    $dbh->query($sql);
    #printf("%s\n",$sql);
    }
    $dbh = null;
    printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);
    }
    }
    class Logging extends Stackable {
    protected static $dbh;
    public function __construct() {
    $dbhost = 'db.example.com'; // 数据库服务器
    $dbuser = 'example.com'; // 数据库用户名
    $dbpw = 'password'; // 数据库密码
    $dbname = 'example_real'; // 数据库名
    self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',
    PDO::MYSQL_ATTR_COMPRESS => true
    )
    );
    }
    protected function log($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
    echo vsprintf("{$message}\n", $args);
    }
    }
    protected function getConnection(){
    return self::$dbh;
    }
    }
    $pool = new Pool(, \ExampleWorker::class, [new Logging()]);
    $dbhost = 'db.example.com'; // 数据库服务器
    $dbuser = 'example.com'; // 数据库用户名
    $dbpw = 'password'; // 数据库密码
    $dbname = 'db_example';
    $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',
    PDO::MYSQL_ATTR_COMPRESS => true
    )
    );
    $sql = "select `order`,name from accounts where deposit_time is null order by id desc";
    $row = $dbh->query($sql);
    while($account = $row->fetch(PDO::FETCH_ASSOC))
    {
    $pool->submit(new Work($account));
    }
    $pool->shutdown();
    ?> 

    进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

    <?php
    class ExampleWorker extends Worker {
    #public function __construct(Logging $logger) {
    # $this->logger = $logger;
    #}
    #protected $logger;
    protected static $dbh;
    public function __construct() {
    }
    public function run(){
    $dbhost = 'db.example.com'; // 数据库服务器
    $dbuser = 'example.com'; // 数据库用户名
    $dbpw = 'password'; // 数据库密码
    $dbname = 'example'; // 数据库名
    self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',
    PDO::MYSQL_ATTR_COMPRESS => true,
    PDO::ATTR_PERSISTENT => true
    )
    );
    }
    protected function getInstance(){
    return self::$dbh;
    }
    }
    /* the collectable class implements machinery for Pool::collect */
    class Work extends Stackable {
    public function __construct($data) {
    $this->data = $data;
    #print_r($data);
    }
    public function run() {
    #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
    try {
    $dbh = $this->worker->getInstance();
    #print_r($dbh);
    $id = $this->data['id'];
    $mobile = safenet_decrypt($this->data['mobile']);
    #printf("%d, %s \n", $id, $mobile);
    if(strlen($mobile) > ){
    $mobile = substr($mobile, -);
    }
    if($mobile == 'null'){
    # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";
    # printf("%s\n",$sql);
    # $dbh->query($sql);
    $mobile = '';
    $sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
    }else{
    $sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id";
    }
    $sth = $dbh->prepare($sql);
    $sth->bindValue(':mobile', $mobile);
    $sth->bindValue(':id', $id);
    $sth->execute();
    #echo $sth->debugDumpParams();
    }
    catch(PDOException $e) {
    $error = sprintf("%s,%s\n", $mobile, $id );
    file_put_contents("mobile_error.log", $error, FILE_APPEND);
    }
    #$dbh = null;
    printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
    #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);
    }
    }
    $pool = new Pool(, \ExampleWorker::class, []);
    #foreach (range(, ) as $number) {
    # $pool->submit(new Work($number));
    #}
    $dbhost = 'db.example.com'; // 数据库服务器
    $dbuser = 'example.com'; // 数据库用户名
    $dbpw = 'password'; // 数据库密码
    $dbname = 'example';
    $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
    PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',
    PDO::MYSQL_ATTR_COMPRESS => true
    )
    );
    #print_r($dbh);
    #$sql = "select id, mobile from members where id < :id";
    #$sth = $dbh->prepare($sql);
    #$sth->bindValue(':id',);
    #$sth->execute();
    #$result = $sth->fetchAll();
    #print_r($result);
    #
    #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
    #$sth = $dbh->prepare($sql);
    #$sth->bindValue(':mobile', 'aa');
    #$sth->bindValue(':id','');
    #echo $sth->execute();
    #echo $sth->queryString;
    #echo $sth->debugDumpParams();
    $sql = "select id, mobile from members order by id asc"; // limit ";
    $row = $dbh->query($sql);
    while($members = $row->fetch(PDO::FETCH_ASSOC))
    {
    #$order = $account['order'];
    #printf("%s\n",$order);
    //print_r($members);
    $pool->submit(new Work($members));
    #unset($account['order']);
    }
    $pool->shutdown();
    ?> 

    多线程中操作数据库总结

    总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

    数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

    <?php
    $dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(
    PDO::ATTR_PERSISTENT => true
    ));
    ?>

    关于php pthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:CodeIgniter配置之database.php用法实例分析_PHP 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • 分享PHP函数使用小工具(附代码示例)• PHP安全编码总结(经验分享)• 非常全面!PHP常见漏洞代码总结!• 一文详解PHP实现职责链设计模式(附代码示例)• php实现通过JSON RPC与go通讯(附代码)
    1/1

    PHP中文网