think-queueの分析(redis周りの分析)

藏色散人
リリース: 2021-07-27 08:57:16
転載
4076 人が閲覧しました

まえがき

分析する前に、必ずメッセージ キューの実装を理解してください

tp5 メッセージ キューはデータベース Redis と Topthink の TP 公式実装に基づいています
この章は分析用に Redis に基づいています

ストレージ キー:

#key Type Description
queues:queueName list 実行するタスク
think:queue:restart string 再起動キューのタイムスタンプ
queues:queueName:layed zSet 遅延タスク
##queues :queueName:reserved zSet 実行に失敗しました。再実行を待機しています

コマンドを実行します

work listen との違いについては以下で説明します
#コマンド#説明#php think queue:workphp think queue:listenphp think queue:restart php think queue:subscribe ##動作タグ
Listening queue
#リッスンキュー
キューを再起動
まだありません。予約されている可能性があります。公式には他のアイデアもありますが、まだ実装されていません

タグ

##説明worker_daemon_startデーモン開始タスクの実行開始前タスク実行の遅延#queue_failedコマンドパラメータ使用可能なモード
worker_memory_exceeded メモリ超過
worker_queue_restart デーモンの再起動
#worker_before_process
worker_before_sleep
##タスク実行失敗
パラメータ デフォルト値
説明

キュー名前実行するタスクnullworkデーモンプロセスとしてタスクを実行します0仕事、聞く失敗後の再実行時間nullwork失敗後の再実行時間128M作業、リスニング最大メモリの制限3仕事、聞く## Tryswork,listenタスク失敗後の最大試行回数

モードの違い

1: 異なる実行原理
work: シングルプロセス処理モード;
デーモンパラメータなしワークプロセスは、次のメッセージを処理した後、現在のプロセスを直接終了します。新しいメッセージがない場合、一定期間スリープしてから終了します。
デーモン パラメータを使用すると、ワーク プロセスは、メモリがパラメータ設定を超えるまでループでキュー内のメッセージを処理してから、プロセスを終了します。 。新しいメッセージがない場合、各ループで一定期間スリープします。

listen: 親プロセスと子プロセスの処理モード。
は親プロセスに単一の実行モードを作成します。作業子プロセスは、作業子プロセスを通じてキュー内の次のメッセージを処理します。作業子プロセスが終了すると、
が配置されている親プロセスは、子プロセスの終了シグナルをリッスンして、新しいメッセージを作成します。 1. Work 子プロセスが一度に実行される;

2: 終了タイミングが異なります
work: 上記を参照
listen: 親プロセスは、次の 2 つの場合を除き、通常の状況で常に実行されます。
01: 作成された work 子プロセスの実行時間が listen コマンド ラインの --timeout パラメータ設定を超えます。この時点で、work 子プロセスは強制的に終了され、listen が実行されている親プロセスは見つかった場合も、ProcessTimeoutException 例外がスローされて終了します。;

開発者は、この例外をキャッチして親プロセスの実行を続行することを選択できます;
02: 親プロセスに何らかの理由でメモリ リークがある場合、親プロセス自体が占有するメモリがコマンド ラインを超えた場合 --memory パラメーターが設定されている場合、親プロセスと子プロセスの両方が終了します。通常の状況では、リッスン プロセス自体が占有するメモリは安定しています。

3: パフォーマンスの違い
work: スクリプト内でループし、コマンド実行の初期段階でフレームワーク スクリプトが読み込まれます;

listen: 処理後に新しいプロセスを開始しますタスク ワーク プロセスはこの時点でフレームワーク スクリプトをリロードします;

したがって、ワーク モードのパフォーマンスはリッスン モードよりも高くなります。
注: コードが更新された場合、作業モードで php think queue:restart コマンドを手動で実行して、変更を有効にするためにキューを再起動する必要があります。リッスン モードでは、他の操作を行わずに自動的に有効になります。オペレーション。

4: タイムアウト制御機能
work: 本質的には、プロセス自体の実行時間を制御することも、実行中のタスクの実行時間を制限することもできません。
listen: 作業の子を制限できますプロセスのタイムアウト時間;

タイムアウト パラメータを使用して、作業サブプロセスの実行が許可される最大時間を制限できます。この制限時間を超えて終了しなかったサブプロセスは、強制終了;
期限切れと時間の違い

期限切れは構成ファイルで設定され、タスクの有効期限を指します。この時間はグローバルであり、すべての作業プロセスに影響します。
タイムアウトはコマンド ライン パラメータに設定され、作業サブプロセスのタイムアウト時間を参照します。今回は、現在実行されている listen コマンドに対してのみ有効です。タイムアウトの対象は作業サブプロセスです。

5 : さまざまな使用シナリオ

作業に適用できるシナリオは次のとおりです:
01: 多数のタスク
02: 高いパフォーマンス要件
03: 短いタスク実行時間
04: タスクの実行時間が短い無限ループ、sleep()、exit()、die()、およびコンシューマ クラスでバグを引き起こしやすいその他のロジック

listen 該当するシナリオは次のとおりです:

01: タスクの数が小さい
02: タスクの実行時間が長い
03: タスクの実行時間を厳密に制限する必要がある

公開操作

解析を行っているためRedis に基づいているため、src/queue/connector/redis.php
01: 最初に src/Queue.php を呼び出します。__callStatic
のマジック メソッドを分析するだけです。 02: buildConnector は __callStatic メソッドで呼び出されます
03: 構成ファイルは最初に buildConnector で読み込まれます。ない場合は同期的に実行されます
04: 構成ファイルに基づいて接続を作成しますそして、構成を渡します

redis.php クラスのコンストラクターでの操作:
01: redis 拡張機能がインストールされているかどうかを確認します
02: 構成をマージします
03: 検出Redis 拡張機能か pRedis かどうか
04: 接続オブジェクトの作成

公開プロセス

公開パラメーター

null 作業、リッスン
##daemon
遅延
force
メモリ
睡眠 ##タスクがないときの待ち時間
0
##$dataemptyタスクデータpush,later$queuedefaultタスク名push,later #$遅延##

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');
ログイン後にコピー

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]
ログイン後にコピー

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');
ログイン後にコピー

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]
ログイン後にコピー

think-queueの分析(redis周りの分析)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }
ログイン後にコピー

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行
ログイン後にコピー

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
ログイン後にコピー

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}
ログイン後にコピー

更多thinkphp技术知识,请访问thinkphp教程栏目!

パラメーター名 デフォルト値 説明 使用できるメソッド
$job None タスクを実行するクラス push,later
null 遅延時間 後で

以上がthink-queueの分析(redis周りの分析)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:segmentfault.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート