queue


Queue

Introduction

{tip} Laravel now provides Horizon, a beautiful dashboard and configuration system for your Redis queues. Check out the full Horizon documentation for more information.

Laravel queue provides a unified API for different background queue services, such as Beanstalk, Amazon SQS, Redis, and even other queues based on relational databases. The purpose of the queue is to delay processing of time-consuming tasks, such as sending emails, thereby greatly shortening the time of web requests and responses.

The queue configuration file is stored in the

config/queue.php file. Configuration for each queue driver can be found in this file, including database, Beanstalkd, Amazon SQS, Redis, and synchronization (local use) drivers . It also contains a null queue driver for tasks that abandon the queue.

Connection Vs. Queue

Before starting to use Laravel queue, it is important to understand the difference between "connection" and "queue". In your config/queue.php configuration file, there is a connections configuration option. This option defines a unique connection to a backend service such as Amazon SQS, Beanstalk, or Redis. Either way, there may be multiple "queues" for a given connection, and "queues" can be thought of as different stacks or a large number of queued tasks.

It should be noted that the configuration example of each connection in the queue configuration file contains a queue attribute. This is the default queue task that will be distributed to this queue when it is sent to the specified connection. In other words, if you do not explicitly define a queue when distributing a task, it will be placed in the queue defined by the queue attribute in the connection configuration:

// 这个任务将被分发到默认队列...
Job::dispatch();
// 这个任务将被发送到「emails」队列...
Job::dispatch()->onQueue('emails');

Some applications may There is no need to send tasks to different queues, just send them to a simple queue. But pushing tasks to different queues is still very useful, because the Laravel queue handler allows you to define the priority of the queue, so you can assign different priorities to different queues or distinguish different processing methods for different tasks. For example, if you push tasks to the high queue, you can have the queue processor prioritize those tasks:

php artisan queue:work --queue=high,default

Necessary settings for the driver

Database

In order to use the database queue driver, you need a data table to store Task. Run the queue:table Artisan command to create the migration file for this table. When the migration file is created, you can use the migrate command to migrate:

php artisan queue:table
php artisan migrate

Redis

In order to use redis Queue driver, you need to configure the Redis database connection in the config/database.php configuration file.

Redis Cluster

If your Redis queue driver uses Redis Cluster, your queue name must contain a key hash tag. This is to ensure that all Redis keys for a queue are placed in the same hash.

'redis' => [ 
   'driver' => 'redis',    
   'connection' => 'default',    
   'queue' => '{default}',    
   'retry_after' => 90,
],

Blocking

When using the Redis queue, you can use the block_for configuration item to specify that the driver should put the task back into Redis How long to block before polling the database and processor.

It is much more efficient to adjust this value based on your queue load than to poll the Redis database for new tasks. For example, you could set this value to 5 to indicate that the driver should block for 5 seconds while waiting for a task to become available.

'redis' => [
    'driver' => 'redis',    
    'connection' => 'default',    
    'queue' => 'default',    
    'retry_after' => 90,    
    'block_for' => 5,
  ],

Dependent extension packages for other queue drivers

Before using the queue service in the list, the following dependent extension packages must be installed:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0

Create Task

Generate task classes

In your application, the task classes of the queue are placed in the app/Jobs directory by default. If this directory does not exist, it will be created automatically when you run the make:job Artisan command. You can use the following Artisan command to generate a new queue task:

php artisan make:job ProcessPodcast

The generated class implements the Illuminate\Contracts\Queue\ShouldQueue interface, which means that this task will be Push to queue instead of executing synchronously.

Task class structure

The structure of the task class is very simple. Generally speaking, it only contains a queue for to call the handle method of this task. Let's look at an example task class. In this example, assume that we manage a podcast publishing service and need to process the uploaded podcast files before publishing:

<?php
    namespace App\Jobs;
    use App\Podcast;
    use App\AudioProcessor;
    use Illuminate\Bus\Queueable;
    use Illuminate\Queue\SerializesModels;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Bus\Dispatchable;
    class ProcessPodcast implements ShouldQueue{ 
       use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;    
       protected $podcast;    
     /**
     * 创建一个新的任务实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */   
     public function __construct(Podcast $podcast)  
       {      
         $this->podcast = $podcast;  
         }   
     /**
     * 运行任务。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */   
     public function handle(AudioProcessor $processor)   
      {       
        // Process uploaded podcast...   
       }
    }

Note that in this example, we pass an Eloquent model directly in the constructor of the task class . Because we reference the SerializesModels trait in the task class, the Eloquent model can be serialized and deserialized gracefully when processing tasks. If your queue task class receives an Eloquent model in the constructor, only properties that recognize that model will be serialized to the queue. When the task is actually run, the queuing system automatically retrieves the complete model from the database. This entire process is completely transparent to your application, which avoids some of the problems that come with serializing full Eloquent pattern instances.

When processing tasks in the queue, the handle method will be called, and here we can also let Laravel's service container automatically inject dependencies through the parameter type prompt of the handle method. object.

If you want to have full control over how the container injects dependent objects into the handle method, you can use the container's bindMethod method. bindMethod The method accepts a task and container callback. Although the handle method can be called directly in the callback, it is recommended to call it from the service provider:

use App\Jobs\ProcessPodcast;
$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

{note} Binary data such as image content should be placed in Before entering the queue task, you must use the base64_encode method to convert it. Otherwise, when this task is placed on the queue, it may not be serialized correctly to JSON.

Distributing tasks

Once you have finished writing your task class you can use its own dispatch method to distribute it. The parameters passed to the dispatch method will be passed to the task's constructor:

<?php
    namespace App\Http\Controllers;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;
    use App\Http\Controllers\Controller;
    class PodcastController extends Controller{  
      /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */   
    public function store(Request $request)  
      {      
        // 创建播客...        
        ProcessPodcast::dispatch($podcast);   
      }
  }

Delayed Dispatch

If you want to delay the execution of your queue task, you can use the delay method when distributing the task. For example, let's detail a task that won't be executed until ten minutes:

<?php
    namespace App\Http\Controllers;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;
    use App\Http\Controllers\Controller;class PodcastController extends Controller{   
     /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */   
    public function store(Request $request)   
     {       
      // 创建播客...        
      ProcessPodcast::dispatch($podcast)             
         ->delay(now()->addMinutes(10));   
        }
    }

{note} The Amazon SQS queue service has a maximum latency of 15 minutes.

Synchronous scheduling

If you want to execute the queue task immediately (synchronously), you can use dispatchNow method. When using this method, the queue task will not be queued and run immediately in the current process:

<?php
    namespace App\Http\Controllers;
    use Illuminate\Http\Request;
    use App\Jobs\ProcessPodcast;
    use App\Http\Controllers\Controller;
    class PodcastController extends Controller{  
     /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */   
   public function store(Request $request)  
     {       
      // Create podcast...       
       ProcessPodcast::dispatchNow($podcast);  
      }
    }

Workchain

Work chains allow you to specifically define a list of queue tasks that are executed in sequence. Once a task in the sequence fails, the remaining work will not be executed. To run a job chain, you can use the withChain method on a distributable task:

ProcessPodcast::withChain([ 
   new OptimizePodcast,    
   new ReleasePodcast
 ])->dispatch();

{note} use $this->delete() Method deleting queue tasks will not prevent the execution of work chain tasks. The work chain will stop executing only when a task in the work chain fails to execute.

Workchain Connections & Queues

If you want to define the default connection and queue used for the workchain, you can use allOnConnection and allOnQueue methods. These methods specify the connection and queue of the desired queue - unless the queue task is explicitly assigned to a different connection/queue:

ProcessPodcast::withChain([
    new OptimizePodcast,    
    new ReleasePodcast
 ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

Custom connection & queue

Distribute tasks to specified queues

By distributing tasks to different queues, you can "categorize" your queue tasks, even Specify the number of tasks assigned to different queues. Remember, this is not pushing tasks to different connections in the queue configuration file you define, but to a single connection. To specify a queue, use the onQueue method when distributing tasks:

<?php
    namespace App\Http\Controllers;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;
    use App\Http\Controllers\Controller;
    class PodcastController extends Controller{   
     /**
     * 存储一个新的播客节目。
     *
     * @param  Request  $request
     * @return Response
     */    
   public function store(Request $request)    
   {      
     // 创建播客...      
       ProcessPodcast::dispatch($podcast)->onQueue('processing');   
    }
  }

Distribute tasks to the specified connection

If you are working on a multi-queue connection , you can specify which connection to distribute the task to. To specify a connection, use the onConnection method when dispatching the task:

<?php
    namespace App\Http\Controllers;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;
    use App\Http\Controllers\Controller;
    class PodcastController extends Controller{   
     /**
     * 存储一个新播客节目。
     *
     * @param  Request  $request
     * @return Response
     */   
    public function store(Request $request)   
     {       
      // 创建播客...       
      ProcessPodcast::dispatch($podcast)->onConnection('sqs');   
      }
   }

Of course, you can chain the onConnection and onQueue methods to specify Connections and queues.

ProcessPodcast::dispatch($podcast)       
       ->onConnection('sqs')              
       ->onQueue('processing');

Specify the maximum number of task attempts/timeout value

Maximum number of attempts

You can re-specify the maximum number of attempts for a task through the Artisan command-- tries option specifies:

php artisan queue:work --tries=3

You may want to do a more granular handling of the maximum number of task attempts through the task class itself. If the maximum number of attempts is defined in the task class, it will be provided in preference to the value on the command line:

<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{    
     /**
     * 任务可以尝试的最大次数。
     *
     * @var int
     */    
    public $tries = 5;}

based on time Attempts

As an alternative to defining how many times a task will be attempted before failing, you can define a task timeout. This way, the task can be attempted an unlimited number of times within a given time frame. To define the timeout for a task, add a retryUntil method to your task class:

/**
 * 定义任务超时时间
 *
 * @return \DateTime
 */
 public function retryUntil(){
     return now()->addSeconds(5);
  }

{tip} You can also add a retryUntil method to your queue event listener Use the

retryUntil
method.

Timeout

{note} timeout Feature is available for PHP 7.1 and

pcntl
PHP extensions Optimized.

Similarly, the value of the maximum number of seconds for task execution can be specified through the

--timeout

option on the Artisan command line.

php artisan queue:work --timeout=30

However, you may also want to define a timeout in the task class itself. If specified in the task class, the priority will be higher than the command line:

<?php
    namespace App\Jobs;class ProcessPodcast implements ShouldQueue{  
     /**
     * 任务可以执行的最大秒数 (超时时间)。
     *
     * @var int
     */   
    public $timeout = 120;}

######

Frequency Limit

{note} This feature requires that your application can use the Redis server.

If your application uses Redis, you can pass time or Concurrency limits for your queue tasks. This feature is helpful when your queue tasks are consumed through an API that is also rate limited.

For example, using the throttle method, you can limit a task of a given type to only execute 10 times every 60 seconds. If the lock is not acquired, you should generally put the task back into the queue so that it can be retried later.

Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // 任务逻辑...
  },
 function () { 
   // 无法获得锁...    
  return $this->release(10);
});

{tip} In the above example, key can be any unique identifying string for the task type you want to limit the frequency of. For example, use a widget's key based on the task class name, or the ID of the Eloquent model it operates on.

{note} Releasing restricted jobs back to the queue will still increase the total number of jobs attempts.

Alternatively, you can specify the maximum number of tasks that can be executed simultaneously. This can be useful when a task in a queue is modifying a resource that can only be modified by one task at a time. For example, using the funnel method, you can restrict a task of a given type to only execute on one processor at a time:

Redis::funnel('key')->limit(1)->then(function () {
    // 任务逻辑...},
    function () {  
      // 无法获得锁...   
     return $this->release(10);
 });

{tip} When using frequency limiting, task execution The number of successful attempts may be difficult to determine. Therefore, it is useful to combine frequency limits with time limits.

Error handling

If an exception occurs during task execution, the task will be automatically released to queue to try again. The task will be released until the maximum number of retries allowed by the application is reached. The maximum retry value is defined by the queue:work Artisan command's --tries option, or in the task class. More information on execution queue handlers can be found below.

Queue closure

You can also call the closure directly instead of scheduling the task class to the queue . This is useful for quick, simple tasks that need to be performed:

$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) { 
   $podcast->publish();
});

When a closure is dispatched to a queue, the code contents of the closure are cryptographically signed so they cannot be modified in transit.

Running Queue Processor

Laravel includes a queue processor to execute tasks pushed to the queue. You can run the processor using the queue:work Artisan command. Note that once the queue:work command starts execution, it will continue to run until it is manually stopped or the terminal is closed.

php artisan queue:work

{tip} To keep the queue:work process running in the background, you should use a process manager such as Supervisor to ensure that the queue processor does not Will stop running

Remember, the queue processor is a resident process and saves the started application state in memory. Therefore, they won't notice changes to your code after startup. So, during your redeployment, please remember torestart your queue handler.

Specify connection & queue

You can also Specifies which queue connection the queue processor should use. The connection name passed to work should match one of the connections defined in your config/queue.php configuration file.

php artisan queue:work redis

You can even customize your queue handler to only execute the queue specified in the connection. For example, if all your emails are processed by the emails queue of the redis connection, you can start a processor that only executes this queue using the following command:

php artisan queue:work redis --queue=emails

Execute a single task

--once option is used to cause the queue processor to process only a single task in the queue.

php artisan queue:work --once

Process all queued tasks and then exit

--stop-when-empty option can be used to process the queue processor to process all jobs and then Exit gracefully. This option is useful when running a Laravel queue in a Docker container if you wish to shut down the container after the queue is empty:

php artisan queue:work --stop-when-empty

Resource Notes

Background Residency The remaining queue handlers will not "restart" the framework after executing each task. Therefore, you should release any excessive resources after each task is completed. For example, if you are performing image processing with the GD library, you should use imagedestroy to free the memory when you are done.

Queue Priority

Sometimes you may want to prioritize queue execution. For example, in config/queue.php you can set the priority of the queue queue connected to redis from default to low. However, occasionally you may want to push a task to the high queue as follows:

dispatch((new Job)->onQueue('high'));

to run a processor to confirm that the task in the low queue is in all high To continue execution only after the queue task is completed, you can pass a comma-separated list of queue names as arguments to the work command.

php artisan queue:work --queue=high,low

##Queue Processor & Deployment

Because queue processors are resident processes, they do not Your code changes will be applied. Therefore, the easiest way to deploy an application that uses a queue processor is to restart the queue processor during the deployment process. You can gracefully restart all queue processors by using the

queue:restart method:

php artisan queue:restart

This command will cause all queue processors to gracefully "abort" after completing the current task, so There will be no lost missions. Since the queue processor will terminate after executing

queue:restart, you should run a process manager such as Supervisor to automatically restart the queue processor.

{tip} The queue uses cache to store restart signals, so you should make sure to configure the cache driver before using this feature.

##Task Expiration & Timeout

Task Expiration

In your

config/queue.php

configuration file, each queue connection has a retry_after option defined. This option specifies how long the queue connection should wait for a task before retrying it. For example, if the value of retry_after is set to 90, then the task will be put back into the queue after 90 seconds of execution rather than being deleted. In general, you should set the value of retry_after to the value that you think your task is likely to take the longest to execute.

{note} The
retry_after

value does not exist only in Amazon SQS. SQS will retry the task based on the default visible timeout value configured in the AWS console.

Processor timeout

queue:work The Artisan command contains a --timeout option. The --timeout option specifies how long Laravel's queue master process waits before aborting a child process executing a task. Sometimes a child process may "freeze" for various reasons, such as failing to respond to an external HTTP request. The --timeout option will remove processes that have been frozen for longer than the specified time.

php artisan queue:work --timeout=60

retry_after configuration items and --timeout command line configuration are not the same, but using them at the same time can ensure that the task will not be lost and the task will only be executed successfully once .

{note} The value of --timeout should be at least a few seconds shorter than the value you configured in retry_after. This ensures that the processor will always abort before a task can be retried. If your --timeout value is longer than the value of retry_after, your task may be executed twice.

Queue process sleep time

When a task is available in the queue, the processor will always process the task without any interval. However, the sleep option defines how long the processor will "sleep" if there are no new tasks. While the processor is sleeping, it will not process any new tasks - tasks will be executed when the queue processor is started again.

php artisan queue:work --sleep=3

Supervisor configuration

Installing Supervisor

Supervisor is Linux A process monitor under the operating system that can automatically restart queue:work when it hangs. To install Supervisor on Ubuntu, you can use the following command:

sudo apt-get install supervisor

{Tips} If you find it difficult to configure Supervisor, you can consider using Laravel Forge, which will automatically Install and configure Supervisor for your Laravel project.

Configuring Supervisor

The Supervisor configuration file is usually located in the /etc/supervisor/conf.d directory. In this directory, you can create any number of configuration files to control how supervisor will monitor your process. For example, create a laravel-worker.conf file to start and monitor a queue:work process:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

In this example, numprocs# The ## directive will specify the Supervisor to run 8 queue:work processes and monitor them, and automatically restart them if they hang. You should change the queue:work sqs part of the command options to represent your desired queue connection.

Start Supervisor

After the configuration file is created, you can use the following command to update the Supervisor configuration and start the process:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

For more information about Supervisor, you can checkSupervisor Documentation.

Handling failed tasks

Sometimes your queued tasks will fail to execute . Relax your mind, good things come only in time. Laravel includes a convenience method to specify the maximum number of times a task should be attempted. If a job has reached the maximum number of attempts, it will be inserted into the failed_jobs database table. To create the failed_jobs database migration table, you can use the queue:failed-table command:

php artisan queue:failed-table

php artisan migrate

Then, when you run the queue worker, You should use the --tries switch in the queue:work command to specify the maximum number of times the task should be attempted to run. If no value is specified for the --tries option, an infinite loop will attempt to run the task:

php artisan queue:work redis --tries=3

task Cleaning up after failure

You can define the failed method directly in the task class, allowing you to perform cleanup work for the task when the task fails. This is an excellent place to send alerts to the user or resume any actions performed by the task. The Exception that causes the task to fail will be passed to the failed method:

<?php
    namespace App\Jobs;
    use Exception;use App\Podcast;
    use App\AudioProcessor;
    use Illuminate\Bus\Queueable;
    use Illuminate\Queue\SerializesModels;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Contracts\Queue\ShouldQueue;
    class ProcessPodcast implements ShouldQueue{ 
       use InteractsWithQueue, Queueable, SerializesModels;    
       protected $podcast;   
     /**
     * 创建任务实例
     *
     * @param  Podcast  $podcast
     * @return void
     */   
     public function __construct(Podcast $podcast)  
       {       
        $this->podcast = $podcast;  
        }   
    /**
     * 执行任务
     *
     * @param  AudioProcessor  $processor
     * @return void
     */  
     public function handle(AudioProcessor $processor) 
        {     
           // 上传播客……   
         }  
    /**
     * 任务失败的处理过程
     *
     * @param  Exception  $exception
     * @return void
     */   
     public function failed(Exception $exception) 
        {     
          // 给用户发送任务失败的通知,等等……   
         }
      }

task Failure event

If you want to register a callable event when a task fails, you can use the Queue::failing method. This event is a great time to notify your team via email or Slack. For example, we can attach a callback event in AppServiceProvider in Laravel:

<?php
    namespace App\Providers;
    use Illuminate\Support\Facades\Queue;
    use Illuminate\Queue\Events\JobFailed;
    use Illuminate\Support\ServiceProvider;
    class AppServiceProvider extends ServiceProvider{   
     /**
     * 启动任意服务。
     *
     * @return void
     */    
    public function boot()   
     {       
      Queue::failing(function (JobFailed $event) {     
             // $event->connectionName            
             // $event->job            
             // $event->exception        
           }); 
       }    
    /**
     * 注册服务提供者。
     *
     * @return void
     */    
    public function register()    { 
           //   
           }
    }

Retry failed Tasks

To view all tasks that have been placed in the failed_jobs data table, you can use the Artisan command queue:failed:

php artisan queue:failed

queue:failed The command will list the task ID, queue, and failure time. The task ID may be used to retry failed tasks. For example, to retry a task with task ID 5, use the following command:

php artisan queue:retry 5

To retry all failed tasks, execute the queue:retry command, Pass all as the ID:

php artisan queue:retry all

If you want to delete a failed task, use the queue:forget command:

php artisan queue:forget 5

To clear For all failed tasks, use the queue:flush command:

php artisan queue:flush

Ignore missing models

When injecting an Eloquent model into a task, the model will be automatically serialized before being put into the queue and restored when the task is executed. However, if the model is deleted while the task is waiting to be executed, the task may fail with a ModelNotFoundException .

For convenience, you can choose to set the task's deleteWhenMissingModels property to true to automatically delete tasks with missing models.

/**
 * 如果模型缺失即删除任务。
 *
 * @var bool
 */
 public $deleteWhenMissingModels = true;

Task events

By using before# in the Queue facade ## and after methods, you can specify a callback before and after the queue task is executed. These callbacks are an excellent opportunity to add additional logs or increase statistics. Normally, you should call these methods in the service provider. For example, we can use Laravel's AppServiceProvider:

<?php
    namespace App\Providers;
    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;
    class AppServiceProvider extends ServiceProvider{    
     /**
     * 引导启动任意应用服务。
     *
     * @return void
     */    
    public function boot()   
     {       
        Queue::before(function (JobProcessing $event) {   
                // $event->connectionName            
                // $event->job            
                // $event->job->payload()      
              });      
        Queue::after(function (JobProcessed $event) {      
              // $event->connectionName            
              // $event->job            
              // $event->job->payload()       
            });    }   
    /**
     * 注册服务提供者。
     *
     * @return void
     */   
    public function register()   
   {      
    //  
     }
  }

Use the

looping method on the Queue facade to execute the callback before the processor attempts to obtain the task . For example, you might want to use a closure to roll back transactions that have not yet been closed by a previously failed task:

Queue::looping(function () { 
   while (DB::transactionLevel() > 0) { 
          DB::rollBack();   
       }
    });

This article first appeared on the
LearnKu.com website.