• 技术文章 >后端开发 >php教程

    php+redis实现延迟队列

    不言不言2018-05-07 10:41:15原创9600
    这篇文章主要介绍了关于php+redis实现延迟队列,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

    基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
    我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
    不多说,贴代码,不回排版,见谅

    1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)

    namespace app\command;
    
    use app\common\lib\delayqueue\DelayQueue;
    use think\console\Command;
    use think\console\Input;
    use think\console\Output;
    use think\Db;
    
    class DelayQueueWorker extends Command
    {
        const COMMAND_ARGV_1 = 'queue';
    
        protected function configure()
        {
            $this->setName('delay-queue')->setDescription('延迟队列任务进程');
            $this->addArgument(self::COMMAND_ARGV_1);
        }
    
        protected function execute(Input $input, Output $output)
        {
            $queue = $input->getArgument(self::COMMAND_ARGV_1);
            //参数1 延迟队列表名,对应与redis的有序集key名
            while (true) {
                DelayQueue::getInstance($queue)->perform();
                usleep(300000);
            }
        }
    }

    库类目录结构
    clipboard.png

    config.php 里是redis连接参数配置

    RedisHandler.php只实现有序集的操作,重连机制还没有实现

    namespace app\common\lib\delayqueue;
    
    class RedisHandler
    {
        public $provider;
        private static $_instance = null;
    
        private function __construct() {
            $this->provider = new \Redis();
            //host port
            $config = require_once 'config.php';
            $this->provider->connect($config['redis_host'], $config['redis_port']);
        }
    
        final private function __clone() {}
    
        public static function getInstance() {
            if(!self::$_instance) {
                self::$_instance = new RedisHandler();
            }
            return self::$_instance;
        }
    
        /**
         * @param string $key 有序集key
         * @param number $score 排序值
         * @param string $value 格式化的数据
         * @return int
         */
        public function zAdd($key, $score, $value)
        {
            return $this->provider->zAdd($key, $score, $value);
        }
    
        /**
         * 获取有序集数据
         * @param $key
         * @param $start
         * @param $end
         * @param null $withscores
         * @return array
         */
        public function zRange($key, $start, $end, $withscores = null)
        {
            return $this->provider->zRange($key, $start, $end, $withscores);
        }
    
        /**
         * 删除有序集数据
         * @param $key
         * @param $member
         * @return int
         */
        public function zRem($key,$member)
        {
            return $this->provider->zRem($key,$member);
        }
    
    }

    延迟队列类

    namespace app\common\lib\delayqueue;
    
    class DelayQueue
    {
    
        private $prefix = 'delay_queue:';
    
        private $queue;
    
        private static $_instance = null;
    
        private function __construct($queue) {
            $this->queue = $queue;
        }
    
        final private function __clone() {}
    
        public static function getInstance($queue = '') {
            if(!self::$_instance) {
                self::$_instance = new DelayQueue($queue);
            }
            return self::$_instance;
        }
    
        /**
         * 添加任务信息到队列
         *
         * demo DelayQueue::getInstance('test')->addTask(
         *    'app\common\lib\delayqueue\job\Test',
         *    strtotime('2018-05-02 20:55:20'),
         *    ['abc'=>111]
         * );
         *
         * @param $jobClass
         * @param int $runTime 执行时间
         * @param array $args
         */
        public function addTask($jobClass, $runTime, $args = null)
        {
    
            $key = $this->prefix.$this->queue;
    
            $params = [
                'class' => $jobClass,
                'args'  => $args,
                'runtime' => $runTime,
            ];
    
            RedisHandler::getInstance()->zAdd(
                $key,
                $runTime,
                serialize($params)
            );
        }
    
        /**
         * 执行job
         * @return bool
         */
        public function perform()
        {
            $key = $this->prefix.$this->queue;
            //取出有序集第一个元素
            $result = RedisHandler::getInstance()->zRange($key, 0 ,0);
    
            if (!$result) {
                return false;
            }
    
            $jobInfo = unserialize($result[0]);
    
            print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
    
            $jobClass = $jobInfo['class'];
    
            if(!@class_exists($jobClass)) {
                print_r($jobClass.' undefined'. PHP_EOL);
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return false;
            }
    
            // 到时间执行
            if (time() >= $jobInfo['runtime']) {
                $job = new $jobClass;
                $job->setPayload($jobInfo['args']);
                $jobResult = $job->preform();
                if ($jobResult) {
                    // 将任务移除
                    RedisHandler::getInstance()->zRem($key, $result[0]);
                    return true;
                }
            }
    
            return false;
        }
    
    }

    异步任务基类:

    namespace app\common\lib\delayqueue;
    
    class DelayJob
    {
    
        protected $payload;
    
        public function preform ()
        {
            // todo
            return true;
        }
    
    
        public function setPayload($args = null)
        {
            $this->payload = $args;
        }
    
    }

    所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务

    如:

    namespace app\common\lib\delayqueue\job;
    
    use app\common\lib\delayqueue\DelayJob;
    
    class Test extends DelayJob
    {
    
        public function preform()
        {
            // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
            print_r('test job'.PHP_EOL);
            return true;
        }
    
    }

    使用方法:

    假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:

    DelayQueue::getInstance('close_order')->addTask(
         'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
         strtotime('2018-05-02 20:55:20'), // 订单失效时间
         ['order_id'=>123456] // 传递给job的参数
     );

    close_order 是有序集的key

    命令行启动进程

    php think delay-queue close_order

    相关推荐:

    PHP+redis实现session共享

    以上就是php+redis实现延迟队列的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    专题推荐:php+redis php 队列
    上一篇:封装的php cUrl请求函数 下一篇:关于Nginx的架构
    VIP课程(WEB全栈开发)

    相关文章推荐

    • 【活动】充值PHP中文网VIP即送云服务器• 详细介绍PHP中时间处理类Carbon的用法• 分享一个neo4j(图形数据库)的PHP库!• 一文聊聊php5.4的特性【总结】• PHP商城那个好? 2022年十大开源PHP商城【分享】• PHP常量两种定义方法:define和const有什么区别
    1/1

    PHP中文网