queue
Queue
- Introduction
- Create task
- Distribution tasks
- Queue closure
- Run Queue Processor
- Queue Priority
- ##Queue Processor & Deployment
- Task expiration & timeout
- Supervisor configuration
- Handling failed tasks
- Quest events
{tip} Laravel now provides Horizon, a beautiful dashboard and configuration system for your Redis queues. Check out the full Horizon documentation for more information.Laravel queue provides a unified API for different background queue services, such as Beanstalk, Amazon SQS, Redis, and even other queues based on relational databases. The purpose of the queue is to delay processing of time-consuming tasks, such as sending emails, thereby greatly shortening the time of web requests and responses. The queue configuration file is stored in the
config/queue.php file. Configuration for each queue driver can be found in this file, including database,
Beanstalkd, Amazon SQS, Redis, and synchronization (local use) drivers . It also contains a null queue driver for tasks that abandon the queue.
Connection Vs. Queue
Before starting to use Laravel queue, it is important to understand the difference between "connection" and "queue". In your config/queue.php
configuration file, there is a connections
configuration option. This option defines a unique connection to a backend service such as Amazon SQS, Beanstalk, or Redis. Either way, there may be multiple "queues" for a given connection, and "queues" can be thought of as different stacks or a large number of queued tasks.
It should be noted that the configuration example of each connection in the queue
configuration file contains a queue
attribute. This is the default queue task that will be distributed to this queue when it is sent to the specified connection. In other words, if you do not explicitly define a queue when distributing a task, it will be placed in the queue defined by the queue
attribute in the connection configuration:
// 这个任务将被分发到默认队列... Job::dispatch(); // 这个任务将被发送到「emails」队列... Job::dispatch()->onQueue('emails');
Some applications may There is no need to send tasks to different queues, just send them to a simple queue. But pushing tasks to different queues is still very useful, because the Laravel queue handler allows you to define the priority of the queue, so you can assign different priorities to different queues or distinguish different processing methods for different tasks. For example, if you push tasks to the high
queue, you can have the queue processor prioritize those tasks:
php artisan queue:work --queue=high,default
Necessary settings for the driver
Database
In order to use the database
queue driver, you need a data table to store Task. Run the queue:table
Artisan command to create the migration file for this table. When the migration file is created, you can use the migrate
command to migrate:
php artisan queue:table php artisan migrate
Redis
In order to use redis
Queue driver, you need to configure the Redis database connection in the config/database.php
configuration file.
Redis Cluster
If your Redis queue driver uses Redis Cluster, your queue name must contain a key hash tag. This is to ensure that all Redis keys for a queue are placed in the same hash.
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ],
Blocking
When using the Redis queue, you can use the block_for
configuration item to specify that the driver should put the task back into Redis How long to block before polling the database and processor.
It is much more efficient to adjust this value based on your queue load than to poll the Redis database for new tasks. For example, you could set this value to 5
to indicate that the driver should block for 5 seconds while waiting for a task to become available.
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5, ],
Dependent extension packages for other queue drivers
Before using the queue service in the list, the following dependent extension packages must be installed:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
Create Task
Generate task classes
In your application, the task classes of the queue are placed in the app/Jobs
directory by default. If this directory does not exist, it will be created automatically when you run the make:job
Artisan command. You can use the following Artisan command to generate a new queue task:
php artisan make:job ProcessPodcast
The generated class implements the Illuminate\Contracts\Queue\ShouldQueue
interface, which means that this task will be Push to queue instead of executing synchronously.
Task class structure
The structure of the task class is very simple. Generally speaking, it only contains a queue for to call the handle
method of this task. Let's look at an example task class. In this example, assume that we manage a podcast publishing service and need to process the uploaded podcast files before publishing:
<?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建一个新的任务实例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 运行任务。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } }
Note that in this example, we pass an Eloquent model directly in the constructor of the task class . Because we reference the SerializesModels
trait in the task class, the Eloquent model can be serialized and deserialized gracefully when processing tasks. If your queue task class receives an Eloquent model in the constructor, only properties that recognize that model will be serialized to the queue. When the task is actually run, the queuing system automatically retrieves the complete model from the database. This entire process is completely transparent to your application, which avoids some of the problems that come with serializing full Eloquent pattern instances.
When processing tasks in the queue, the handle
method will be called, and here we can also let Laravel's service container automatically inject dependencies through the parameter type prompt of the handle
method. object.
If you want to have full control over how the container injects dependent objects into the handle
method, you can use the container's bindMethod
method. bindMethod
The method accepts a task and container callback. Although the handle
method can be called directly in the callback, it is recommended to call it from the service provider:
use App\Jobs\ProcessPodcast; $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) { return $job->handle($app->make(AudioProcessor::class)); });
{note} Binary data such as image content should be placed in Before entering the queue task, you must use the
base64_encode
method to convert it. Otherwise, when this task is placed on the queue, it may not be serialized correctly to JSON.
Distributing tasks
Once you have finished writing your task class you can use its own dispatch
method to distribute it. The parameters passed to the dispatch
method will be passed to the task's constructor:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast); } }
Delayed Dispatch
If you want to delay the execution of your queue task, you can use the delay
method when distributing the task. For example, let's detail a task that won't be executed until ten minutes:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); } }
{note} The Amazon SQS queue service has a maximum latency of 15 minutes.
Synchronous scheduling
If you want to execute the queue task immediately (synchronously), you can use dispatchNow
method. When using this method, the queue task will not be queued and run immediately in the current process:
<?php namespace App\Http\Controllers; use Illuminate\Http\Request; use App\Jobs\ProcessPodcast; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * Store a new podcast. * * @param Request $request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatchNow($podcast); } }
Workchain
Work chains allow you to specifically define a list of queue tasks that are executed in sequence. Once a task in the sequence fails, the remaining work will not be executed. To run a job chain, you can use the withChain
method on a distributable task:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch();
{note} use
$this->delete()
Method deleting queue tasks will not prevent the execution of work chain tasks. The work chain will stop executing only when a task in the work chain fails to execute.
Workchain Connections & Queues
If you want to define the default connection and queue used for the workchain, you can use allOnConnection
and allOnQueue
methods. These methods specify the connection and queue of the desired queue - unless the queue task is explicitly assigned to a different connection/queue:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
Custom connection & queue
Distribute tasks to specified queues
By distributing tasks to different queues, you can "categorize" your queue tasks, even Specify the number of tasks assigned to different queues. Remember, this is not pushing tasks to different connections in the queue configuration file you define, but to a single connection. To specify a queue, use the onQueue
method when distributing tasks:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast)->onQueue('processing'); } }
Distribute tasks to the specified connection
If you are working on a multi-queue connection , you can specify which connection to distribute the task to. To specify a connection, use the onConnection
method when dispatching the task:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); } }
Of course, you can chain the onConnection
and onQueue
methods to specify Connections and queues.
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
Specify the maximum number of task attempts/timeout value
Maximum number of attempts
You can re-specify the maximum number of attempts for a task through the Artisan command-- tries
option specifies:
php artisan queue:work --tries=3
You may want to do a more granular handling of the maximum number of task attempts through the task class itself. If the maximum number of attempts is defined in the task class, it will be provided in preference to the value on the command line:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任务可以尝试的最大次数。 * * @var int */ public $tries = 5;}
based on time Attempts
As an alternative to defining how many times a task will be attempted before failing, you can define a task timeout. This way, the task can be attempted an unlimited number of times within a given time frame. To define the timeout for a task, add a retryUntil
method to your task class:
/** * 定义任务超时时间 * * @return \DateTime */ public function retryUntil(){ return now()->addSeconds(5); }
method.{tip} You can also add a
retryUntilretryUntil
method to your queue event listener Use the
TimeoutPHP extensions Optimized.pcntl
{note}
timeout
Feature is available for PHP 7.1 and
Similarly, the value of the maximum number of seconds for task execution can be specified through the
option on the Artisan command line.
php artisan queue:work --timeout=30
However, you may also want to define a timeout in the task class itself. If specified in the task class, the priority will be higher than the command line: <?php
namespace App\Jobs;class ProcessPodcast implements ShouldQueue{
/**
* 任务可以执行的最大秒数 (超时时间)。
*
* @var int
*/
public $timeout = 120;}
Frequency Limit
{note} This feature requires that your application can use the Redis server.
If your application uses Redis, you can pass time or Concurrency limits for your queue tasks. This feature is helpful when your queue tasks are consumed through an API that is also rate limited.
For example, using the throttle
method, you can limit a task of a given type to only execute 10 times every 60 seconds. If the lock is not acquired, you should generally put the task back into the queue so that it can be retried later.
Redis::throttle('key')->allow(10)->every(60)->then(function () { // 任务逻辑... }, function () { // 无法获得锁... return $this->release(10); });
{tip} In the above example,
key
can be any unique identifying string for the task type you want to limit the frequency of. For example, use a widget's key based on the task class name, or the ID of the Eloquent model it operates on.{note} Releasing restricted jobs back to the queue will still increase the total number of jobs
attempts
.
Alternatively, you can specify the maximum number of tasks that can be executed simultaneously. This can be useful when a task in a queue is modifying a resource that can only be modified by one task at a time. For example, using the funnel
method, you can restrict a task of a given type to only execute on one processor at a time:
Redis::funnel('key')->limit(1)->then(function () { // 任务逻辑...}, function () { // 无法获得锁... return $this->release(10); });
{tip} When using frequency limiting, task execution The number of successful attempts may be difficult to determine. Therefore, it is useful to combine frequency limits with time limits.
Error handling
If an exception occurs during task execution, the task will be automatically released to queue to try again. The task will be released until the maximum number of retries allowed by the application is reached. The maximum retry value is defined by the queue:work
Artisan command's --tries
option, or in the task class. More information on execution queue handlers can be found below.
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish(); });When a closure is dispatched to a queue, the code contents of the closure are cryptographically signed so they cannot be modified in transit.
Running Queue Processor
Laravel includes a queue processor to execute tasks pushed to the queue. You can run the processor using the queue:work
Artisan command. Note that once the queue:work
command starts execution, it will continue to run until it is manually stopped or the terminal is closed.
php artisan queue:work
{tip} To keep the
queue:work
process running in the background, you should use a process manager such as Supervisor to ensure that the queue processor does not Will stop running
Remember, the queue processor is a resident process and saves the started application state in memory. Therefore, they won't notice changes to your code after startup. So, during your redeployment, please remember torestart your queue handler.
Specify connection & queue
You can also Specifies which queue connection the queue processor should use. The connection name passed to work
should match one of the connections defined in your config/queue.php
configuration file.
php artisan queue:work redis
You can even customize your queue handler to only execute the queue specified in the connection. For example, if all your emails are processed by the emails
queue of the redis
connection, you can start a processor that only executes this queue using the following command:
php artisan queue:work redis --queue=emails
Execute a single task
--once
option is used to cause the queue processor to process only a single task in the queue.
php artisan queue:work --once
Process all queued tasks and then exit
--stop-when-empty
option can be used to process the queue processor to process all jobs and then Exit gracefully. This option is useful when running a Laravel queue in a Docker container if you wish to shut down the container after the queue is empty:
php artisan queue:work --stop-when-empty
Resource Notes
Background Residency The remaining queue handlers will not "restart" the framework after executing each task. Therefore, you should release any excessive resources after each task is completed. For example, if you are performing image processing with the GD library, you should use imagedestroy
to free the memory when you are done.
Queue Priority
Sometimes you may want to prioritize queue execution. For example, in config/queue.php
you can set the priority of the queue
queue connected to redis
from default
to low
. However, occasionally you may want to push a task to the high
queue as follows:
dispatch((new Job)->onQueue('high'));
to run a processor to confirm that the task in the low
queue is in all high
To continue execution only after the queue task is completed, you can pass a comma-separated list of queue names as arguments to the work
command.
php artisan queue:work --queue=high,low##Queue Processor & DeploymentBecause queue processors are resident processes, they do not Your code changes will be applied. Therefore, the easiest way to deploy an application that uses a queue processor is to restart the queue processor during the deployment process. You can gracefully restart all queue processors by using the
queue:restart method:
php artisan queue:restartThis command will cause all queue processors to gracefully "abort" after completing the current task, so There will be no lost missions. Since the queue processor will terminate after executing
queue:restart, you should run a process manager such as
Supervisor to automatically restart the queue processor.
{tip} The queue uses cache to store restart signals, so you should make sure to configure the cache driver before using this feature.
In your
config/queue.php configuration file, each queue connection has a retry_after
option defined. This option specifies how long the queue connection should wait for a task before retrying it. For example, if the value of retry_after
is set to 90
, then the task will be put back into the queue after 90 seconds of execution rather than being deleted. In general, you should set the value of retry_after
to the value that you think your task is likely to take the longest to execute.
retry_aftervalue does not exist only in Amazon SQS. SQS will retry the task based on the default visible timeout value configured in the AWS console.
Processor timeout
queue:work
The Artisan command contains a --timeout
option. The --timeout
option specifies how long Laravel's queue master process waits before aborting a child process executing a task. Sometimes a child process may "freeze" for various reasons, such as failing to respond to an external HTTP request. The --timeout
option will remove processes that have been frozen for longer than the specified time.
php artisan queue:work --timeout=60
retry_after
configuration items and --timeout
command line configuration are not the same, but using them at the same time can ensure that the task will not be lost and the task will only be executed successfully once .
{note} The value of
--timeout
should be at least a few seconds shorter than the value you configured inretry_after
. This ensures that the processor will always abort before a task can be retried. If your--timeout
value is longer than the value ofretry_after
, your task may be executed twice.
Queue process sleep time
When a task is available in the queue, the processor will always process the task without any interval. However, the sleep
option defines how long the processor will "sleep" if there are no new tasks. While the processor is sleeping, it will not process any new tasks - tasks will be executed when the queue processor is started again.
php artisan queue:work --sleep=3
Supervisor configuration
Installing Supervisor
Supervisor is Linux A process monitor under the operating system that can automatically restart queue:work
when it hangs. To install Supervisor on Ubuntu, you can use the following command:
sudo apt-get install supervisor
{Tips} If you find it difficult to configure Supervisor, you can consider using Laravel Forge, which will automatically Install and configure Supervisor for your Laravel project.
Configuring Supervisor
The Supervisor configuration file is usually located in the /etc/supervisor/conf.d
directory. In this directory, you can create any number of configuration files to control how supervisor will monitor your process. For example, create a laravel-worker.conf
file to start and monitor a queue:work
process:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log
In this example, numprocs# The ## directive will specify the Supervisor to run 8
queue:work processes and monitor them, and automatically restart them if they hang. You should change the
queue:work sqs part of the
command options to represent your desired queue connection.
Start Supervisor
After the configuration file is created, you can use the following command to update the Supervisor configuration and start the process:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
For more information about Supervisor, you can checkSupervisor Documentation.
Handling failed tasks
Sometimes your queued tasks will fail to execute . Relax your mind, good things come only in time. Laravel includes a convenience method to specify the maximum number of times a task should be attempted. If a job has reached the maximum number of attempts, it will be inserted into the failed_jobs
database table. To create the failed_jobs
database migration table, you can use the queue:failed-table
command:
php artisan queue:failed-table php artisan migrate
Then, when you run the queue worker, You should use the --tries
switch in the queue:work
command to specify the maximum number of times the task should be attempted to run. If no value is specified for the --tries
option, an infinite loop will attempt to run the task:
php artisan queue:work redis --tries=3
task Cleaning up after failure
You can define the failed
method directly in the task class, allowing you to perform cleanup work for the task when the task fails. This is an excellent place to send alerts to the user or resume any actions performed by the task. The Exception
that causes the task to fail will be passed to the failed
method:
<?php namespace App\Jobs; use Exception;use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建任务实例 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 执行任务 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 上传播客…… } /** * 任务失败的处理过程 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 给用户发送任务失败的通知,等等…… } }
task Failure event
If you want to register a callable event when a task fails, you can use the Queue::failing
method. This event is a great time to notify your team via email or Slack. For example, we can attach a callback event in AppServiceProvider
in Laravel:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider{ /** * 启动任意服务。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注册服务提供者。 * * @return void */ public function register() { // } }
Retry failed Tasks
To view all tasks that have been placed in the failed_jobs
data table, you can use the Artisan command queue:failed
:
php artisan queue:failed
queue:failed
The command will list the task ID, queue, and failure time. The task ID may be used to retry failed tasks. For example, to retry a task with task ID 5
, use the following command:
php artisan queue:retry 5
To retry all failed tasks, execute the queue:retry
command, Pass all
as the ID:
php artisan queue:retry all
If you want to delete a failed task, use the queue:forget
command:
php artisan queue:forget 5
To clear For all failed tasks, use the queue:flush
command:
php artisan queue:flush
Ignore missing models
When injecting an Eloquent model into a task, the model will be automatically serialized before being put into the queue and restored when the task is executed. However, if the model is deleted while the task is waiting to be executed, the task may fail with a ModelNotFoundException
.
For convenience, you can choose to set the task's deleteWhenMissingModels
property to true
to automatically delete tasks with missing models.
/** * 如果模型缺失即删除任务。 * * @var bool */ public $deleteWhenMissingModels = true;
Task events
By using before# in the
Queue facade ## and
after methods, you can specify a callback before and after the queue task is executed. These callbacks are an excellent opportunity to add additional logs or increase statistics. Normally, you should call these methods in the service provider. For example, we can use Laravel's
AppServiceProvider:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * 引导启动任意应用服务。 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注册服务提供者。 * * @return void */ public function register() { // } }Use the
looping method on the
Queue facade to execute the callback before the processor attempts to obtain the task . For example, you might want to use a closure to roll back transactions that have not yet been closed by a previously failed task:
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });This article first appeared on the