Home > PHP Framework > ThinkPHP > body text

Analyze think-queue (analysis around redis)

藏色散人
Release: 2021-07-27 08:57:16
forward
4075 people have browsed it

Preface

Before the analysis, please be sure to understand the implementation of the message queue

tp5 The message queue is based on database redis and TP official implementation of Topthink
This chapter is based on redis for analysis

Storage key:

key Type Description
queues:queueName list Task to be executed
think:queue:restart string Restart queue timestamp
queues:queueName:delayed zSet Delayed Task
queues :queueName:reserved zSet Execution failed, waiting for re-execution

Execute command

work The difference with listen will be explained below
Command Description
php think queue:work Listening queue
php think queue:listen Listening queue
php think queue:restart Restart queue
php think queue:subscribe None yet, it may be reserved. The official has other ideas but they haven’t been implemented yet
##Behavior tags

TagDescription##worker_daemon_startworker_memory_exceededworker_queue_restartworker_before_processworker_before_sleep##queue_failedCommand parameters
Daemon start
Memory exceeded
Restart daemon
Before the task starts execution
Delayed task execution
Task execution failed

ParametersDefault valueUsable modesDescriptionqueuework,listenName of the task to be executed##daemondelayforce memorysleeptries
null
nullwork Perform tasks as a daemon process
0work,listen Time to re-execute after failure
nullwork Time to re-execute after failure
128Mwork,listen Limit maximum memory
3work,listen Waiting time when there is no task
0work,listen Maximum number of attempts after task failure

Mode difference

1: Different execution principles
work: Single process processing mode;
No daemon parameter The work process will directly end the current process after processing the next message. When there are no new messages, it will sleep for a period of time and then exit;
With daemon parameters, the work process will process the messages in the queue in a loop until the memory exceeds the parameter configuration before ending the process. When there is no new message, it will sleep for a period of time in each loop;

listen: the processing mode of the parent process and child process;
will create a single execution mode in the parent process. The work child process will process the next message in the queue through the work child process. When the work child process exits, the parent process where
is located will listen to the exit signal of the child process and create a new one. Work child process executed in a single time;

2: The exit timing is different
work: See above
listen: The parent process will always run under normal circumstances, unless the following two situations are encountered
01: The execution time of a created work child process exceeds the --timeout parameter configuration in the listen command line; at this time, the work child process will be forcibly terminated, and the parent process where listen is located will also throw a ProcessTimeoutException exception and exit. ;

Developers can choose to catch this exception and let the parent process continue to execute;
02: If the parent process has a memory leak for some reason, when the memory occupied by the parent process itself exceeds the command line When the --memory parameter is configured in, both the parent and child processes will exit. Under normal circumstances, the memory occupied by the listen process itself is stable.

3: Different performance
work: It loops inside the script, and the framework script is loaded in the early stage of command execution;

listen: It starts a new process after processing a task A work process will reload the framework script at this time;

Therefore, the performance of work mode will be higher than that of listen mode.
Note: When the code is updated, you need to manually execute the php think queue:restart command in work mode to restart the queue for the changes to take effect; while in listen mode, it will take effect automatically without any other operations.

4: Timeout control capability
work: In essence, it can neither control the running time of the process itself nor limit the execution time of the executing tasks;
listen: can limit the work children it creates The timeout time of the process;

The timeout parameter can be used to limit the maximum time that the work sub-process is allowed to run. Sub-processes that have not ended beyond this time limit will be forcibly terminated;
The difference between expire and time

expire is set in the configuration file and refers to the expiration time of the task. This time is global and affects all work processes.
timeout is set in the command line parameters and refers to the timeout time of the work sub-process. This time It is only valid for the currently executed listen command. The target of timeout is the work sub-process;

5: Different usage scenarios

work applicable scenarios are:
01: A large number of tasks
02: High performance requirements
03: Short task execution time
04: There is no infinite loop, sleep(), exit(), die() and other logic that can easily lead to bugs in the consumer class

listen Applicable scenarios are:

01: The number of tasks is small
02: The execution time of the task is long
03: The execution time of the task needs to be strictly limited

Public operations

Since we are doing analysis based on redis, we only need to analyze src/queue/connector/redis.php
01: First call src/Queue.php The magic method in __callStatic
02: buildConnector is called in the __callStatic method
03: The configuration file is loaded first in buildConnector. If none, it will be executed synchronously
04: Create a connection based on the configuration file and pass in the configuration

Operations in the constructor of the redis.php class:
01: Check whether the redis extension is installed
02: Merge configuration
03: Detect whether it is a redis extension or pRedis
04: Create a connection object

Publishing process

Publishing parameters

##$dataemptyTask datapush,later$queuedefaultTask namepush,later $delaynullDelay timelater##

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');
Copy after login

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]
Copy after login

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');
Copy after login

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]
Copy after login

Analyze think-queue (analysis around redis)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }
Copy after login

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行
Copy after login

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
Copy after login

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}
Copy after login

更多thinkphp技术知识,请访问thinkphp教程栏目!

Parameter name Default value Description Methods that can be used
$job None The class to perform the task push,later

The above is the detailed content of Analyze think-queue (analysis around redis). For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:segmentfault.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template