• 技术文章 >php框架 >ThinkPHP

    解析think-queue(围绕redis做分析)

    藏色散人藏色散人2021-07-27 08:57:16转载424

    前言

    分析之前请大家务必了解消息队列的实现

    tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink
    本章是围绕redis来做分析

    存储key:

    key类型描述
    queues:queueNamelist要执行的任务
    think:queue:restartstring重启队列时间戳
    queues:queueName:delayedzSet延迟任务
    queues:queueName:reservedzSet执行失败,等待重新执行

    执行命令

    work和listen的区别在下面会解释
    命令描述
    php think queue:work监听队列
    php think queue:listen监听队列
    php think queue:restart重启队列
    php think queue:subscribe暂无,可能是保留的 官方有什么其他想法但是还没实现

    行为标签

    标签描述
    worker_daemon_start守护进程开启
    worker_memory_exceeded内存超出
    worker_queue_restart重启守护进程
    worker_before_process任务开始执行之前
    worker_before_sleep任务延迟执行
    queue_failed任务执行失败

    命令参数

    参数默认值可以使用的模式描述
    queuenullwork,listen要执行的任务名称
    daemonnullwork以守护进程执行任务
    delay0work,listen失败后重新执行的时间
    forcenullwork失败后重新执行的时间
    memory128Mwork,listen限制最大内存
    sleep3work,listen没有任务的时候等待的时间
    tries0work,listen任务失败后最大尝试次数

    模式区别

    1: 执行原理不同
    work: 单进程的处理模式;
    无 daemon 参数 work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出;
    有 daemon 参数 work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间;

    listen: 父进程 + 子进程 的处理模式;
    会在所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后;
    所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程;

    2: 退出时机不同
    work: 看上面
    listen: 所在的父进程正常情况会一直运行,除非遇到下面两种情况
    01: 创建的某个work子进程的执行时间超过了 listen命令行中的--timeout 参数配置;此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出;

    开发者可以选择捕获该异常,让父进程继续执行;
    02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 --memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。

    3: 性能不同
    work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

    listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本;

    因此 work 模式的性能会比listen模式高。
    注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

    4: 超时控制能力
    work: 本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间;
    listen: 可以限制其创建的work子进程的超时时间;

    可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;
    expire 和time的区别

    expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程
    timeout 在命令行参数中设置,指work子进程的超时时间,这个时间只对当前执行的listen 命令有效,timeout 针对的对象是 work 子进程;

    5: 使用场景不同

    work 适用场景是:
    01: 任务数量较多
    02: 性能要求较高
    03: 任务的执行时间较短
    04: 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑

    listen 适用场景是:

    01: 任务数量较少
    02: 任务的执行时间较长
    03: 任务的执行时间需要有严格限制

    公有操作

    由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php
    01: 首先调用 src/Queue.php中的魔术方法 __callStatic
    02: 在__callStatic方法中调用了 buildConnector
    03: buildConnector 中首先加载配置文件 如果无将是同步执行
    04: 根据配置文件去创建连接并且传入配置

    在redis.php类的构造方法中的操作:
    01: 检测redis扩展是否安装
    02: 合并配置
    03: 检测是redis扩展还是 pRedis
    04: 创建连接对象

    发布过程

    发布参数

    参数名默认值描述可以使用的方法
    $job要执行任务的类push,later
    $data任务数据push,later
    $queuedefault任务名称push,later
    $delaynull延迟时间later

    立即执行

        push($job, $data, $queue)
        Queue::push(Test::class, ['id' => 1], 'test');

    一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
    数组结构:

    [
        'job' => $job, // 要执行任务的类
        'data' => $data, // 任务数据
        'id'=>'xxxxx' //任务id
    ]

    写入 redis并且返回队列id
    至于中间的那顿骚操作太长了就没写

    延迟发布

        later($delay, $job, $data, $queue)
        Queue::later(100, Test::class, ['id' => 1], 'test');

    跟上面的差不多
    一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

    执行过程

    执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
    最后讲一下标签的使用

        //守护进程开启
        'worker_daemon_start' => [
            \app\index\behavior\WorkerDaemonStart::class
        ],
        //内存超出
        'worker_memory_exceeded' => [
            \app\index\behavior\WorkerMemoryExceeded::class
        ],
        //重启守护进程
        'worker_queue_restart' => [
            \app\index\behavior\WorkerQueueRestart::class
        ],
        //任务开始执行之前
        'worker_before_process' => [
            \app\index\behavior\WorkerBeforeProcess::class
        ],
        //任务延迟执行
        'worker_before_sleep' => [
            \app\index\behavior\WorkerBeforeSleep::class
        ],
        //任务执行失败
        'queue_failed' => [
            \app\index\behavior\QueueFailed::class
        ]

    图片描述

    public function run(Output $output)
        {
            $output->write('<info>任务执行失败</info>', true);
        }

    控制台执行 php think queue:work --queue test --daemon
    会在控制台一次输出

    守护进程开启
    任务延迟执行

    失败的处理 如果有任务执行失败或者执行次数达到最大值
    会触发 queue_failed

    app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

    最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

    在其他项目中推任务

    php版本

    <?php
    
    class Index
    {
        private $redis = null;
    
        public function __construct()
        {
            $this->redis = new Redis();
            $this->redis->connect('127.0.0.1', 6379);
            $this->redis->select(10);
        }
    
        public function push($job, $data, $queue)
        {
            $payload = $this->createPayload($job, $data);
            $this->redis->rPush('queues:' . $queue, $payload);
        }
    
        public function later($delay, $job, $data, $queue)
        {
            $payload = $this->createPayload($job, $data);
            $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
        }
    
        private function createPayload($job, $data)
        {
            $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
            return $this->setMeta($payload, 'attempts', 1);
        }
    
        private function setMeta($payload, $key, $value)
        {
            $payload = json_decode($payload, true);
            $payload[$key] = $value;
            $payload = json_encode($payload);
    
            if (JSON_ERROR_NONE !== json_last_error()) {
                throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
            }
    
            return $payload;
        }
    
        private function random(int $length = 16): string
        {
            $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
            $randomString = '';
            for ($i = 0; $i < $length; $i++) {
                $randomString .= $str[rand(0, strlen($str) - 1)];
            }
            return $randomString;
        }
    }
    
    (new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

    go版本

    package main
    
    import (
        "encoding/json"
        "github.com/garyburd/redigo/redis"
        "math/rand"
        "time"
    )
    
    type Payload struct {
        Id       string      `json:"id"`
        Job      string      `json:"job"`
        Data     interface{} `json:"data"`
        Attempts int         `json:"attempts"`
    }
    
    var RedisClient *redis.Pool
    
    func init() {
        RedisClient = &redis.Pool{
            MaxIdle:     20,
            MaxActive:   500,
            IdleTimeout: time.Second * 100,
            Dial: func() (conn redis.Conn, e error) {
                c, err := redis.Dial("tcp", "127.0.0.1:6379")
    
                if err != nil {
                    return nil, err
                }
    
                _, _ = c.Do("SELECT", 10)
    
                return c, nil
            },
        }
    
    }
    
    func main() {
    
        var data = make(map[string]interface{})
        data["id"] = "1"
    
        later(10, "app\\index\\jobs\\Test", data, "test")
    }
    
    func push(job string, data interface{}, queue string) {
        payload := createPayload(job, data)
        queueName := "queues:" + queue
    
        _, _ = RedisClient.Get().Do("rPush", queueName, payload)
    }
    
    func later(delay int, job string, data interface{}, queue string) {
    
        m, _ := time.ParseDuration("+1s")
        currentTime := time.Now()
        op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
        createPayload(job, data)
        payload := createPayload(job, data)
        queueName := "queues:" + queue + ":delayed"
    
        _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
    }
    
    // 创建指定格式的数据
    func createPayload(job string, data interface{}) (payload string) {
        payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}
    
        jsonStr, _ := json.Marshal(payload1)
    
        return string(jsonStr)
    }
    
    // 创建随机字符串
    func random(n int) string {
    
        var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
    
        b := make([]rune, n)
        for i := range b {
            b[i] = str[rand.Intn(len(str))]
        }
        return string(b)
    }

    更多thinkphp技术知识,请访问thinkphp教程栏目!

    以上就是解析think-queue(围绕redis做分析)的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:segmentfault,如有侵犯,请联系admin@php.cn删除
    专题推荐:redis php thinkphp5
    上一篇:分享一个think-swoole实战案例【详细演示】 下一篇:简析thinkphp5.0域名如何绑定不同模块
    大前端线上培训班

    相关文章推荐

    • ThinkPHP6加载中间件及多应用解析• 解析ThinkPHP6应用程序初始化• 介绍一个好用的ThinkPHP Repository包• 解析ThinkPHP5如何引入Go AOP和PHP AOP编程• 分享一个think-swoole实战案例【详细演示】

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网