• 技术文章 >后端开发 >php教程

    PHP使用Beanstalkd实例详解

    藏色散人藏色散人2019-05-05 14:44:16转载1049
    有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

    1.使用Composer安装Pheanstalk

    composer require pda/pheanstalk

    2.实现代码

    php查看beanstalkd状态脚本Status.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/21
     * Time: 10:32
     */
    require "../vendor/autoload.php";
    use Pheanstalk\Pheanstalk;
    $pheanstalk = new Pheanstalk('192.168.75.135',11300);
    print_r($pheanstalk->stats());

    生产者代码Producter.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/20
     * Time: 16:30
     */
    require "../vendor/autoload.php";
    use Pheanstalk\Pheanstalk;
    $pheanstalk = new Pheanstalk('192.168.75.135',11300);
    for ($i=0;$i<50;$i++){
        $data = array(
            'key' => 'testkey'.$i,
            'value' => 'testvalue',
            'time' => time(),
        );
        $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
        var_dump($ret);
    }

    消费者代码Consumer.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/20
     * Time: 16:31
     */
    set_time_limit(0);
    ini_set('default_socket_timeout', 900);
    require "../vendor/autoload.php";
    use Pheanstalk\Pheanstalk;
    $pheanstalk = new Pheanstalk('192.168.75.135',11300);
    while (true){
        $job = $pheanstalk
            ->watch('test-tube')
            ->ignore('default')
            ->reserve();
        if ($job){
            sleep(2);
            echo $job->getData();
            echo "\n";
            $pheanstalk->delete($job);
        }
    }

    打开命令行/终端窗口,执行生产者,会向tube写入50条任务

    PS E:\repository\work\beanstalk> php .\Producter.php
    int(101)
    int(102)
    int(103)
    int(104)
    int(105)
    int(106)
    int(107)
    int(108)
    int(109)
    int(110)
    int(111)
    int(112)
    int(113)
    int(114)
    ......

    由此可见,$pheanstalk->putInTube成功后返回的是job的id

    查看状态

    PS E:\repository\work\beanstalk> php Status.php
    Pheanstalk\Response\ArrayResponse Object
    (
        [_name:Pheanstalk\Response\ArrayResponse:private] => OK
        [storage:ArrayObject:private] => Array
            (
                [current-jobs-urgent] => 0
                [current-jobs-ready] => 50
                [current-jobs-reserved] => 0
                [current-jobs-delayed] => 0
                [current-jobs-buried] => 0
                ......

    结果中显示处于ready待读取状态的job是50个

    打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争

    消费者1

    PS E:\repository\work\beanstalk> php .\Consumer.php
    {"key":"testkey0","value":"testvalue","time":1548039103}
    {"key":"testkey1","value":"testvalue","time":1548039103}
    {"key":"testkey2","value":"testvalue","time":1548039103}
    {"key":"testkey4","value":"testvalue","time":1548039103}
    {"key":"testkey6","value":"testvalue","time":1548039103}
    {"key":"testkey8","value":"testvalue","time":1548039103}
    {"key":"testkey10","value":"testvalue","time":1548039103}
    {"key":"testkey12","value":"testvalue","time":1548039103}
    {"key":"testkey14","value":"testvalue","time":1548039103}
    {"key":"testkey16","value":"testvalue","time":1548039103}
    {"key":"testkey18","value":"testvalue","time":1548039103}
    {"key":"testkey20","value":"testvalue","time":1548039103}
    {"key":"testkey22","value":"testvalue","time":1548039103}
    {"key":"testkey24","value":"testvalue","time":1548039103}
    {"key":"testkey26","value":"testvalue","time":1548039103}
    {"key":"testkey28","value":"testvalue","time":1548039103}
    {"key":"testkey30","value":"testvalue","time":1548039103}
    {"key":"testkey32","value":"testvalue","time":1548039103}
    {"key":"testkey34","value":"testvalue","time":1548039103}
    {"key":"testkey36","value":"testvalue","time":1548039103}
    {"key":"testkey38","value":"testvalue","time":1548039103}
    {"key":"testkey40","value":"testvalue","time":1548039103}
    {"key":"testkey42","value":"testvalue","time":1548039103}
    {"key":"testkey44","value":"testvalue","time":1548039103}
    {"key":"testkey46","value":"testvalue","time":1548039103}
    {"key":"testkey48","value":"testvalue","time":1548039103}

    消费者2

    PS E:\repository\work\beanstalk> php .\Consumer.php
    {"key":"testkey3","value":"testvalue","time":1548039103}
    {"key":"testkey5","value":"testvalue","time":1548039103}
    {"key":"testkey7","value":"testvalue","time":1548039103}
    {"key":"testkey9","value":"testvalue","time":1548039103}
    {"key":"testkey11","value":"testvalue","time":1548039103}
    {"key":"testkey13","value":"testvalue","time":1548039103}
    {"key":"testkey15","value":"testvalue","time":1548039103}
    {"key":"testkey17","value":"testvalue","time":1548039103}
    {"key":"testkey19","value":"testvalue","time":1548039103}
    {"key":"testkey21","value":"testvalue","time":1548039103}
    {"key":"testkey23","value":"testvalue","time":1548039103}
    {"key":"testkey25","value":"testvalue","time":1548039103}
    {"key":"testkey27","value":"testvalue","time":1548039103}
    {"key":"testkey29","value":"testvalue","time":1548039103}
    {"key":"testkey31","value":"testvalue","time":1548039103}
    {"key":"testkey33","value":"testvalue","time":1548039103}
    {"key":"testkey35","value":"testvalue","time":1548039103}
    {"key":"testkey37","value":"testvalue","time":1548039103}
    {"key":"testkey39","value":"testvalue","time":1548039103}
    {"key":"testkey41","value":"testvalue","time":1548039103}
    {"key":"testkey43","value":"testvalue","time":1548039103}
    {"key":"testkey45","value":"testvalue","time":1548039103}
    {"key":"testkey47","value":"testvalue","time":1548039103}
    {"key":"testkey49","value":"testvalue","time":1548039103}

    两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

    3.需要注意的事项

    1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象

    2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。

    判断socket超时的代码

    public function getLine($length = null)
        {
            $timeout = ini_get('default_socket_timeout');
            $timer   = microtime(true);
            do {
                $data = isset($length) ?
                    $this->_wrapper()->fgets($this->_socket, $length) :
                    $this->_wrapper()->fgets($this->_socket);
                if ($this->_wrapper()->feof($this->_socket)) {
                    throw new Exception\SocketException('Socket closed by server!');
                }
                if (($data === false) && microtime(true) - $timer > $timeout) {
                    $this->disconnect();
                    throw new Exception\SocketException('Socket timed out!');
                }
            } while ($data === false);
            return rtrim($data);
        }

    以上就是PHP使用Beanstalkd实例详解的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:jmsite,如有侵犯,请联系admin@php.cn删除
    专题推荐:php Beanstalkd
    上一篇:PHP中的trim函数怎么用 下一篇:什么是工厂模式?
    大前端线上培训班

    相关文章推荐

    • Python使用 Beanstalkd 做异步任务处理的方法• php中beanstalkd消息队列类案例• PHP操作Beanstalkd的方法及参数注释• asp与php区别是什么?• Beanstalkd消息/任务队列的详解• PHP中的trim函数怎么用

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网