跳至内容

队列

介绍

在构建 Web 应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求中执行起来可能耗时过长。幸运的是,Laravel 允许您轻松创建可在后台处理的队列作业。通过将耗时任务移至队列,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更佳的用户体验。

Laravel 队列为各种不同的队列后端(例如Amazon SQSRedis甚至关系数据库)提供了统一的队列 API。

Laravel 的队列配置选项存储在应用程序的config/queue.php配置文件中。在此文件中,您将找到框架自带的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd驱动程序,以及一个可立即执行作业的同步驱动程序(供本地开发使用)。此外null,还包含一个可丢弃排队作业的队列驱动程序。

Laravel 现已推出 Horizo​​n,一个美观的仪表盘和配置系统,适用于基于 Redis 的队列。查看完整的Horizo​​n 文档了解更多信息。

连接与队列

在开始使用 Laravel 队列之前,务必先了解“连接”和“队列”之间的区别。config/queue.php配置文件中有一个connections配置数组。该数组定义了与后端队列服务(例如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接可能包含多个“队列”,这些队列可以被视为不同的堆栈或队列作业堆。

请注意,配置文件中的每个连接配置示例都queue包含一个属性。这是作业发送到给定连接时将被分派到的默认队列。换句话说,如果您在分派作业时没有明确指定要分派到哪个队列,则该作业将被放置在连接配置的属性queue中定义的队列中:queue

1use App\Jobs\ProcessPodcast;
2 
3// This job is sent to the default connection's default queue...
4ProcessPodcast::dispatch();
5 
6// This job is sent to the default connection's "emails" queue...
7ProcessPodcast::dispatch()->onQueue('emails');

有些应用程序可能不需要将任务推送到多个队列,而是倾向于使用一个简单的队列。然而,将任务推送到多个队列对于希望按优先级或分段处理任务的应用程序尤其有用,因为 Laravel 队列工作器允许你指定按优先级处理哪些队列。例如,如果你将任务推送到一个high队列,你可以运行一个工作器来赋予它们更高的处理优先级:

1php artisan queue:work --queue=high,default

驱动程序注意事项和先决条件

数据库

要使用database队列驱动,你需要一个数据库表来保存任务。通常,这包含在 Laravel 的默认0001_01_01_000002_create_jobs_table.php 数据库迁移中;但是,如果你的应用程序不包含此迁移,你可以使用make:queue-tableArtisan 命令来创建它:

1php artisan make:queue-table
2 
3php artisan migrate

Redis

为了使用redis队列驱动程序,您应该在配置文件中配置 Redis 数据库连接config/database.php

队列驱动程序不支持serializer和Redis 选项compressionredis

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则队列名称必须包含键哈希标签。这是必需的,以确保给定队列的所有 Redis 键都放入同一个哈希槽中:

1'redis' => [
2 'driver' => 'redis',
3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
4 'queue' => env('REDIS_QUEUE', '{default}'),
5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
6 'block_for' => null,
7 'after_commit' => false,
8],

阻塞

使用 Redis 队列时,您可以使用block_for配置选项来指定驱动程序在遍历工作循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据队列负载调整此值比持续轮询 Redis 数据库以查找新作业更有效。例如,您可以将该值设置为:5驱动程序应在等待作业可用时阻塞五秒钟:

1'redis' => [
2 'driver' => 'redis',
3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
4 'queue' => env('REDIS_QUEUE', 'default'),
5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
6 'block_for' => 5,
7 'after_commit' => false,
8],

设置block_for0将导致队列工作器无限期阻塞,直到有任务可用。这也会阻止诸如 之类的信号SIGTERM被处理,直到下一个任务处理完毕。

其他驱动程序先决条件

列出的队列驱动程序需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装:

  • 亚马逊SQS:aws/aws-sdk-php ~3.0
  • Beanstalkd:pda/pheanstalk ~5.0
  • Redis:predis/predis ~2.0或 phpredis PHP 扩展
  • MongoDBmongodb/laravel-mongodb

创造就业机会

生成作业类

默认情况下,应用程序的所有可排队任务都存储在该app/Jobs目录中。如果该app/Jobs目录不存在,则运行make:jobArtisan 命令时会创建该目录:

1php artisan make:job ProcessPodcast

生成的类将实现该Illuminate\Contracts\Queue\ShouldQueue接口,向 Laravel 指示该作业应该被推送到队列中以异步运行。

可以使用存根发布来定制作业存根。

类结构

任务类非常简单,通常只包含一个在handle队列处理任务时调用的方法。首先,我们来看一个示例任务类。在这个例子中,我们假设我们管理一个播客发布服务,需要在发布之前处理已上传的播客文件:

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Queue\Queueable;
9 
10class ProcessPodcast implements ShouldQueue
11{
12 use Queueable;
13 
14 /**
15 * Create a new job instance.
16 */
17 public function __construct(
18 public Podcast $podcast,
19 ) {}
20 
21 /**
22 * Execute the job.
23 */
24 public function handle(AudioProcessor $processor): void
25 {
26 // Process uploaded podcast...
27 }
28}

在此示例中,请注意,我们能够将Eloquent 模型直接传递给队列作业的构造函数。由于Queueable该作业所使用的 trait,Eloquent 模型及其加载的关系将在作业处理时优雅地进行序列化和反序列化。

如果您的队列作业在其构造函数中接受 Eloquent 模型,则只有该模型的标识符会被序列化到队列中。当作业实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其已加载的关系。这种模型序列化方法允许将更小的作业负载发送到您的队列驱动程序。

handle方法依赖注入

handle当队列处理完任务后,会调用该方法。请注意,我们可以在handle任务方法上添加类型Prompts依赖项。Laravel服务容器会自动注入这些依赖项。

如果您想完全控制容器如何将依赖项注入到handle方法中,可以使用容器的bindMethod方法。该bindMethod方法接受一个回调函数,该回调函数接收作业和容器。在回调函数中,您可以随意调用该方法。通常,您应该从服务提供者的方法handle中调用此方法bootApp\Providers\AppServiceProvider

1use App\Jobs\ProcessPodcast;
2use App\Services\AudioProcessor;
3use Illuminate\Contracts\Foundation\Application;
4 
5$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
6 return $job->handle($app->make(AudioProcessor::class));
7});

二进制数据(例如原始图像内容)应base64_encode先通过该函数传递,然后再传递给队列作业。否则,作业在放入队列时可能无法正确序列化为 JSON。

队列关系

由于所有已加载的 Eloquent 模型关系在作业排队时也会被序列化,因此序列化的作业字符串有时会变得非常大。此外,当作业反序列化并从数据库重新检索模型关系时,它们将被完整检索。在作业排队过程中,在模型序列化之前应用的任何先前的关系约束在作业反序列化时都不会应用。因此,如果您希望使用给定关系的子集,则应在排队作业中重新约束该关系。

或者,为了防止关系被序列化,你可以withoutRelations在设置属性值时调用模型上的方法。此方法将返回不包含已加载关系的模型实例:

1/**
2 * Create a new job instance.
3 */
4public function __construct(
5 Podcast $podcast,
6) {
7 $this->podcast = $podcast->withoutRelations();
8}

如果您使用 PHP 构造函数属性提升,并希望表明 Eloquent 模型不应该序列化其关系,则可以使用以下WithoutRelations属性:

1use Illuminate\Queue\Attributes\WithoutRelations;
2 
3/**
4 * Create a new job instance.
5 */
6public function __construct(
7 #[WithoutRelations]
8 public Podcast $podcast,
9) {}

如果作业接收的是 Eloquent 模型集合或数组,而不是单个模型,则在反序列化和执行该作业时,该集合中的模型将不会恢复其关系。这是为了防止处理大量模型的作业占用过多资源。

独特的工作

唯一作业需要支持锁 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray缓存驱动程序均支持原子锁。此外,唯一作业约束不适用于批次内的作业。

有时,您可能希望确保在任意时间点队列中只有一个特定任务的实例。您可以通过ShouldBeUnique在任务类中实现以下接口来实现。此接口不需要您在类中定义任何其他方法:

1<?php
2 
3use Illuminate\Contracts\Queue\ShouldQueue;
4use Illuminate\Contracts\Queue\ShouldBeUnique;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
7{
8 // ...
9}

在上面的例子中,UpdateSearchIndex作业是唯一的。因此,如果队列中已有该作业的另一个实例且尚未完成处理,则不会调度该作业。

在某些情况下,您可能需要定义一个特定的“键”来确保作业的唯一性,或者指定一个超时时间,超过该时间后作业将不再保持唯一性。为此,您可以在作业类中定义uniqueId属性uniqueFor或方法:

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUnique;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
8{
9 /**
10 * The product instance.
11 *
12 * @var \App\Product
13 */
14 public $product;
15 
16 /**
17 * The number of seconds after which the job's unique lock will be released.
18 *
19 * @var int
20 */
21 public $uniqueFor = 3600;
22 
23 /**
24 * Get the unique ID for the job.
25 */
26 public function uniqueId(): string
27 {
28 return $this->product->id;
29 }
30}

在上面的示例中,UpdateSearchIndex作业通过产品 ID 进行唯一分配。因此,任何具有相同产品 ID 的作业调度都将被忽略,直到现有作业处理完毕。此外,如果现有作业在一小时内未处理完毕,则唯一锁将被释放,并将另一个具有相同唯一键的作业调度到队列中。

如果您的应用程序从多个 Web 服务器或容器调度作业,则应确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定作业是否唯一。

在处理开始前保持作业的唯一性

默认情况下,在作业完成处理或所有重试尝试失败后,唯一作业会被“解锁”。但是,在某些情况下,您可能希望作业在处理之前立即解锁。为此,您的作业应该实现以下ShouldBeUniqueUntilProcessing合约,而不是以下ShouldBeUnique合约:

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
8{
9 // ...
10}

独特的职业锁

在后台,当一个ShouldBeUnique任务被调度时,Laravel 会尝试使用密钥获取uniqueId。如果未获取到锁,则任务不会被调度。当任务完成处理或所有重试尝试均失败时,锁会被释放。默认情况下,Laravel 会使用默认的缓存驱动程序来获取锁。但是,如果您希望使用其他驱动程序来获取锁,您可以定义一个uniqueVia方法,返响应该使用的缓存驱动程序:

1use Illuminate\Contracts\Cache\Repository;
2use Illuminate\Support\Facades\Cache;
3 
4class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
5{
6 // ...
7 
8 /**
9 * Get the cache driver for the unique job lock.
10 */
11 public function uniqueVia(): Repository
12 {
13 return Cache::driver('redis');
14 }
15}

如果您只需要限制作业的并发处理,请使用WithoutOverlapping作业中间件。

加密作业

Laravel 允许你通过加密来确保任务数据的隐私性和完整性。首先,只需将ShouldBeEncrypted接口添加到任务类即可。将此接口添加到类后,Laravel 会在将任务推送到队列之前自动对其进行加密:

1<?php
2 
3use Illuminate\Contracts\Queue\ShouldBeEncrypted;
4use Illuminate\Contracts\Queue\ShouldQueue;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
7{
8 // ...
9}

作业中间件

作业中间件允许您围绕队列作业的执行包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下handle方法,它利用 Laravel 的 Redis 速率限制功能,每五秒只允许处理一个作业:

1use Illuminate\Support\Facades\Redis;
2 
3/**
4 * Execute the job.
5 */
6public function handle(): void
7{
8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
9 info('Lock obtained...');
10 
11 // Handle job...
12 }, function () {
13 // Could not obtain lock...
14 
15 return $this->release(5);
16 });
17}

虽然这段代码有效,但handle由于 Redis 速率限制逻辑过于复杂,该方法的实现会变得非常混乱。此外,对于我们想要限制速率的其他作业,也必须复制此速率限制逻辑。

我们可以定义一个任务中间件来处理速率限制,而不是在 handle 方法中设置速率限制。Laravel 没有为任务中间件设置默认位置,因此您可以将任务中间件放置在应用程序的任何位置。在本例中,我们将中间件放置在以下app/Jobs/Middleware目录中:

1<?php
2 
3namespace App\Jobs\Middleware;
4 
5use Closure;
6use Illuminate\Support\Facades\Redis;
7 
8class RateLimited
9{
10 /**
11 * Process the queued job.
12 *
13 * @param \Closure(object): void $next
14 */
15 public function handle(object $job, Closure $next): void
16 {
17 Redis::throttle('key')
18 ->block(0)->allow(1)->every(5)
19 ->then(function () use ($job, $next) {
20 // Lock obtained...
21 
22 $next($job);
23 }, function () use ($job) {
24 // Could not obtain lock...
25 
26 $job->release(5);
27 });
28 }
29}

如您所见,与路由中间件一样,作业中间件接收正在处理的作业以及应调用以继续处理作业的回调。

创建作业中间件后,可以通过从作业middleware方法中返回它们来将它们附加到作业上。此方法在 Artisan 命令搭建的作业中不存在make:job,因此你需要手动将其添加到你的作业类中:

1use App\Jobs\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new RateLimited];
11}

作业中间件还可以分配给可排队的事件监听器、可邮寄的邮件和通知。

速率限制

虽然我们刚刚演示了如何编写自己的速率限制任务中间件,但 Laravel 实际上内置了一个速率限制中间件,你可以用它来限制任务的速率。与路由速率限制器RateLimiter一样,任务速率限制器也是使用Facade 的方法定义的for

例如,您可能希望允许用户每小时备份一次数据,而对高级用户则不设此限制。为此,您可以在的方法RateLimiter中定义一个bootAppServiceProvider

1use Illuminate\Cache\RateLimiting\Limit;
2use Illuminate\Support\Facades\RateLimiter;
3 
4/**
5 * Bootstrap any application services.
6 */
7public function boot(): void
8{
9 RateLimiter::for('backups', function (object $job) {
10 return $job->user->vipCustomer()
11 ? Limit::none()
12 : Limit::perHour(1)->by($job->user->id);
13 });
14}

在上面的示例中,我们定义了按小时计算的速率限制;但是,您可以使用该perMinute方法轻松地基于分钟定义速率限制。此外,您可以将任何所需的值传递给by速率限制的方法;但是,此值最常用于按客户细分速率限制:

1return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,您可以使用Illuminate\Queue\Middleware\RateLimited中间件将速率限制器附加到您的作业。每当作业超出速率限制时,该中间件就会根据速率限制的持续时间,以适当的延迟将作业释放回队列。

1use Illuminate\Queue\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new RateLimited('backups')];
11}

将速率受限的作业释放回队列仍会增加该作业的 总数attempts。您可能希望相应地调整作业类的tries和属性。或者,您可能希望使用retryUntil 方法来定义作业不再尝试执行的时间。maxExceptions

使用该releaseAfter方法,您还可以指定再次尝试释放作业之前必须经过的秒数:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new RateLimited('backups'))->releaseAfter(60)];
9}

如果您不想在速率受限时重试作业,则可以使用该dontRelease方法:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new RateLimited('backups'))->dontRelease()];
9}

如果您正在使用 Redis,您可以使用Illuminate\Queue\Middleware\RateLimitedWithRedis中间件,它针对 Redis 进行了微调,并且比基本速率限制中间件更高效。

防止工作重叠

Laravel 包含一个Illuminate\Queue\Middleware\WithoutOverlapping中间件,允许你根据任意键值来防止作业重叠。当队列作业正在修改某个资源,而该资源一次只能由一个作业修改时,此功能非常有用。

例如,假设您有一个用于更新用户信用评分的队列作业,并且您希望防止同一用户 ID 的信用评分更新作业重叠。为此,您可以WithoutOverlapping从作业的middleware方法中返回中间件:

1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [new WithoutOverlapping($this->user->id)];
11}

任何重叠的同类型作业都将被释放回队列。您还可以指定在再次尝试执行已释放作业之前必须等待的秒数:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
9}

如果您希望立即删除任何重叠的作业,以便它们不会重试,您可以使用该dontRelease方法:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];
9}

WithoutOverlapping中间件由 Laravel 的原子锁功能提供支持。有时,你的作业可能会意外失败或超时,导致锁无法释放。因此,你可以使用该方法明确定义锁的过期时间。例如,下面的示例将指示 Laravel在作业开始处理三分钟后expireAfter释放锁:WithoutOverlapping

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array<int, object>
5 */
6public function middleware(): array
7{
8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
9}

中间件需要支持WithoutOverlapping的缓存驱动程序。目前,缓存驱动程序支持原子锁。memcachedredisdynamodbdatabasefilearray

跨作业类别共享锁密钥

默认情况下,WithoutOverlapping中间件只会阻止同一类别的作业重叠。因此,即使两个不同的作业类别使用相同的锁定键,它们也不会被阻止重叠。不过,你可以使用以下shared命令指示 Laravel 跨作业类别应用锁定键:

1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3class ProviderIsDown
4{
5 // ...
6 
7 public function middleware(): array
8 {
9 return [
10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),
11 ];
12 }
13}
14 
15class ProviderIsUp
16{
17 // ...
18 
19 public function middleware(): array
20 {
21 return [
22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),
23 ];
24 }
25}

限制异常

Laravel 包含一个Illuminate\Queue\Middleware\ThrottlesExceptions中间件,可用于限制异常。一旦作业抛出指定数量的异常,所有后续执行该作业的尝试都将被延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,假设一个队列作业与第三方 API 交互,该 API 开始抛出异常。为了限制异常,您可以ThrottlesExceptions从作业的方法中返回中间件。通常,此中间件应与实现基于时间的尝试次数middleware的作业配对

1use DateTime;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [new ThrottlesExceptions(10, 5 * 60)];
12}
13 
14/**
15 * Determine the time at which the job should timeout.
16 */
17public function retryUntil(): DateTime
18{
19 return now()->addMinutes(30);
20}

中间件接受的第一个构造函数参数是作业在被限制之前可以抛出的异常数量,而第二个构造函数参数是作业在被限制后再次尝试执行之前应经过的秒数。在上面的代码示例中,如果作业连续抛出 10 个异常,我们将等待 5 分钟后再尝试执行该作业,但时间限制为 30 分钟。

当作业抛出异常但尚未达到异常阈值时,通常会立即重试该作业。但是,你可以backoff在将中间件附加到作业时调用该方法来指定此类作业应延迟的分钟数:

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
11}

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并使用作业的类名作为缓存的“键”。你可以在将by中间件附加到作业时调用该方法来覆盖此键。如果你有多个作业与同一个第三方服务交互,并且你希望它们共享一个公共的限流“存储桶”,那么这可能会很有用:

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array<int, object>
7 */
8public function middleware(): array
9{
10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
11}

默认情况下,此中间件会限制所有异常。你可以在将中间件附加到作业时调用该方法来修改此行为。只有当传递给该方法when的闭包返回以下内容时,才会限制异常whentrue

1use Illuminate\Http\Client\HttpClientException;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [(new ThrottlesExceptions(10, 10 * 60))->when(
12 fn (Throwable $throwable) => $throwable instanceof HttpClientException
13 )];
14}

when与将作业释放回队列或引发异常的方法不同,该deleteWhen方法允许您在发生给定异常时完全删除该作业:

1use App\Exceptions\CustomerDeletedException;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
12}

如果你希望将节流异常报告给应用程序的异常处理程序,可以在将report中间件附加到作业时调用该方法。或者,你可以为该report方法提供一个闭包,只有当给定的闭包返回以下内容时,才会报告异常true

1use Illuminate\Http\Client\HttpClientException;
2use Illuminate\Queue\Middleware\ThrottlesExceptions;
3 
4/**
5 * Get the middleware the job should pass through.
6 *
7 * @return array<int, object>
8 */
9public function middleware(): array
10{
11 return [(new ThrottlesExceptions(10, 10 * 60))->report(
12 fn (Throwable $throwable) => $throwable instanceof HttpClientException
13 )];
14}

如果您正在使用 Redis,您可以使用Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis中间件,它针对 Redis 进行了微调,并且比基本的异常限制中间件更高效。

跳过工作

中间件Skip允许您指定应跳过/删除的作业,而无需修改作业的逻辑。Skip::when如果给定条件的计算结果为 ,则该方法将删除该作业;如果条件的计算结果为 ,true则该方法将删除该作业Skip::unlessfalse

1use Illuminate\Queue\Middleware\Skip;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [
9 Skip::when($someCondition),
10 ];
11}

您还可以将 a 传递Closurewhenunless方法以进行更复杂的条件评估:

1use Illuminate\Queue\Middleware\Skip;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [
9 Skip::when(function (): bool {
10 return $this->shouldSkip();
11 }),
12 ];
13}

调度作业

编写完任务类后,就可以使用dispatch任务本身的方法来调度它了。传递给该dispatch方法的参数将被传递给任务的构造函数:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Jobs\ProcessPodcast;
6use App\Models\Podcast;
7use Illuminate\Http\RedirectResponse;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 */
15 public function store(Request $request): RedirectResponse
16 {
17 $podcast = Podcast::create(/* ... */);
18 
19 // ...
20 
21 ProcessPodcast::dispatch($podcast);
22 
23 return redirect('/podcasts');
24 }
25}

如果您想有条件地调度一项工作,您可以使用dispatchIfdispatchUnless方法:

1ProcessPodcast::dispatchIf($accountActive, $podcast);
2 
3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用中,该sync驱动程序是默认的队列驱动程序。该驱动程序在当前请求的前台同步执行任务,这在本地开发中通常很方便。如果您希望真正开始将任务排队到后台处理,您可以在应用程序的config/queue.php配置文件中指定其他队列驱动程序。

延迟调度

如果您想指定某个任务不能立即被队列工作器处理,可以delay在调度该任务时使用该方法。例如,我们可以指定某个任务在调度后 10 分钟内才可供处理:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Jobs\ProcessPodcast;
6use App\Models\Podcast;
7use Illuminate\Http\RedirectResponse;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 */
15 public function store(Request $request): RedirectResponse
16 {
17 $podcast = Podcast::create(/* ... */);
18 
19 // ...
20 
21 ProcessPodcast::dispatch($podcast)
22 ->delay(now()->addMinutes(10));
23 
24 return redirect('/podcasts');
25 }
26}

在某些情况下,作业可能配置了默认延迟。如果您需要绕过此延迟并调度作业进行立即处理,可以使用以下withoutDelay方法:

1ProcessPodcast::dispatch($podcast)->withoutDelay();

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

响应发送到浏览器后调度

或者,dispatchAfterResponse如果您的 Web 服务器使用的是 FastCGI,该方法会延迟调度作业,直到 HTTP 响应发送到用户浏览器之后。这样,即使队列中的作业仍在执行,用户仍然可以开始使用应用程序。这通常只适用于需要大约一秒钟的作业,例如发送电子邮件。由于这些作业在当前 HTTP 请求中处理,因此以这种方式调度的作业不需要队列工作器运行即可进行处理:

1use App\Jobs\SendNotification;
2 
3SendNotification::dispatchAfterResponse();

您还可以使用dispatch闭包并将afterResponse方法链接到Helpers上dispatch,以便在 HTTP 响应发送到浏览器后执行闭包:

1use App\Mail\WelcomeMessage;
2use Illuminate\Support\Facades\Mail;
3 
4dispatch(function () {
5 Mail::to('taylor@example.com')->send(new WelcomeMessage);
6})->afterResponse();

同步调度

如果您想立即(同步)调度任务,可以使用该dispatchSync方法。使用此方法时,任务将不会进入队列,而是在当前进程中立即执行:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Jobs\ProcessPodcast;
6use App\Models\Podcast;
7use Illuminate\Http\RedirectResponse;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 */
15 public function store(Request $request): RedirectResponse
16 {
17 $podcast = Podcast::create(/* ... */);
18 
19 // Create podcast...
20 
21 ProcessPodcast::dispatchSync($podcast);
22 
23 return redirect('/podcasts');
24 }
25}

作业和数据库事务

虽然在数据库事务中调度作业完全没问题,但您应特别注意确保您的作业能够成功执行。在事务中调度作业时,该作业有可能在父事务提交之前就被工作进程处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能都不存在于数据库中。

值得庆幸的是,Laravel 提供了几种解决这个问题的方法。首先,你可以after_commit在队列连接的配置数组中设置 connection 选项:

1'redis' => [
2 'driver' => 'redis',
3 // ...
4 'after_commit' => true,
5],

after_commit选项为时true,您可以在数据库事务中调度作业;但是,Laravel 会等到打开的父数据库事务提交后才实际调度作业。当然,如果当前没有打开的数据库事务,则作业将立即调度。

如果由于事务期间发生的异常而导致事务回滚,则该事务期间调度的作业将被丢弃。

将配置选项设置after_committrue还将导致在提交所有打开的数据库事务后分派任何排队的事件监听器、可邮寄的邮件、通知和广播事件。

内联指定提交调度行为

即使未将after_commit队列连接配置选项设置为true,您仍可以指定在所有打开的数据库事务提交后调度特定作业。为此,您可以将该afterCommit方法链接到调度操作中:

1use App\Jobs\ProcessPodcast;
2 
3ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果after_commit将配置选项设置为true,您可以指示应立即分派特定作业,而无需等待任何打开的数据库事务提交:

1ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定一个队列作业列表,这些作业应在主作业成功执行后按顺序运行。如果序列中的一个作业失败,其余作业将不会运行。要执行队列作业链,您可以使用chain外观提供的方法Bus。Laravel 的命令总线是一个底层组件,队列作业调度建立在其之上:

1use App\Jobs\OptimizePodcast;
2use App\Jobs\ProcessPodcast;
3use App\Jobs\ReleasePodcast;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new ProcessPodcast,
8 new OptimizePodcast,
9 new ReleasePodcast,
10])->dispatch();

除了链接作业类实例之外,您还可以链接闭包:

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 function () {
5 Podcast::update(/* ... */);
6 },
7])->dispatch();

使用$this->delete()作业内的方法删除作业不会阻止链式作业被处理。只有当链中的作业失败时,链才会停止执行。

链式连接和队列

如果您想指定用于链接作业的连接和队列,可以使用onConnectiononQueue方法。这些方法指定应使用的队列连接和队列名称,除非队列作业被明确分配了不同的连接/队列:

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 new ReleasePodcast,
5])->onConnection('redis')->onQueue('podcasts')->dispatch();

将作业添加到链中

有时,您可能需要将一个作业添加到现有作业链中的另一个作业中。您可以使用prependToChainappendToChain方法来实现:

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 // Prepend to the current chain, run job immediately after current job...
9 $this->prependToChain(new TranscribePodcast);
10 
11 // Append to the current chain, run job at end of chain...
12 $this->appendToChain(new TranscribePodcast);
13}

连锁故障

当链接作业时,你可以使用catch方法来指定一个闭包,当链中的作业失败时,该闭包将被调用。给定的回调将接收Throwable导致作业失败的实例:

1use Illuminate\Support\Facades\Bus;
2use Throwable;
3 
4Bus::chain([
5 new ProcessPodcast,
6 new OptimizePodcast,
7 new ReleasePodcast,
8])->catch(function (Throwable $e) {
9 // A job within the chain has failed...
10})->dispatch();

由于链式回调由 Laravel 队列稍后序列化和执行,因此您不应该$this在链式回调中使用该变量。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以对排队的作业进行“分类”,甚至可以确定分配给各个队列的作业数量的优先级。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而只会推送到单个连接中的特定队列。要指定队列,请onQueue在调度作业时使用以下方法:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Jobs\ProcessPodcast;
6use App\Models\Podcast;
7use Illuminate\Http\RedirectResponse;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 */
15 public function store(Request $request): RedirectResponse
16 {
17 $podcast = Podcast::create(/* ... */);
18 
19 // Create podcast...
20 
21 ProcessPodcast::dispatch($podcast)->onQueue('processing');
22 
23 return redirect('/podcasts');
24 }
25}

onQueue或者,您可以通过调用作业的构造函数中的方法来指定作业的队列:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Contracts\Queue\ShouldQueue;
6use Illuminate\Foundation\Queue\Queueable;
7 
8class ProcessPodcast implements ShouldQueue
9{
10 use Queueable;
11 
12 /**
13 * Create a new job instance.
14 */
15 public function __construct()
16 {
17 $this->onQueue('processing');
18 }
19}

调度到特定连接

如果您的应用程序与多个队列连接交互,则可以使用该方法指定将作业推送到哪个连接onConnection

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Jobs\ProcessPodcast;
6use App\Models\Podcast;
7use Illuminate\Http\RedirectResponse;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 */
15 public function store(Request $request): RedirectResponse
16 {
17 $podcast = Podcast::create(/* ... */);
18 
19 // Create podcast...
20 
21 ProcessPodcast::dispatch($podcast)->onConnection('sqs');
22 
23 return redirect('/podcasts');
24 }
25}

onConnection您可以将和方法链接onQueue在一起来指定作业的连接和队列:

1ProcessPodcast::dispatch($podcast)
2 ->onConnection('sqs')
3 ->onQueue('processing');

onConnection或者,您可以通过调用作业的构造函数中的方法来指定作业的连接:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Contracts\Queue\ShouldQueue;
6use Illuminate\Foundation\Queue\Queueable;
7 
8class ProcessPodcast implements ShouldQueue
9{
10 use Queueable;
11 
12 /**
13 * Create a new job instance.
14 */
15 public function __construct()
16 {
17 $this->onConnection('sqs');
18 }
19}

指定最大作业尝试次数/超时值

最大尝试次数

如果队列中的某个任务遇到错误,你肯定不希望它无限期地重试。因此,Laravel 提供了多种方法来指定任务的重试次数或时长。

指定作业最大尝试次数的一种方法是通过--triesArtisan 命令行上的开关。这将应用于 worker 处理的所有作业,除非正在处理的作业指定了允许尝试的次数:

1php artisan queue:work --tries=3

如果作业超出其最大尝试次数,则会被视为“失败”作业。有关处理失败作业的更多信息,请参阅失败作业文档。如果--tries=0将 提供给queue:work命令,则该作业将无限次重试。

您可以采用更精细的方法,在作业类本身上定义作业的最大尝试次数。如果在作业中指定了最大尝试次数,则该值将优先于--tries命令行中提供的值:

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of times the job may be attempted.
9 *
10 * @var int
11 */
12 public $tries = 5;
13}

如果您需要动态控制特定作业的最大尝试次数,您可以tries在该作业上定义一个方法:

1/**
2 * Determine number of times the job may be attempted.
3 */
4public function tries(): int
5{
6 return 5;
7}

基于时间的尝试

除了定义作业在失败前可以尝试的次数之外,您还可以定义作业不再尝试的时间。这允许在给定的时间范围内尝试任意次数的作业。要定义作业不再尝试的时间,请retryUntil向您的作业类添加一个方法。此方法应返回一个DateTime实例:

1use DateTime;
2 
3/**
4 * Determine the time at which the job should timeout.
5 */
6public function retryUntil(): DateTime
7{
8 return now()->addMinutes(10);
9}

如果同时定义了retryUntiltries,则 Laravel 优先使用该retryUntil方法。

您还可以在排队事件监听器排队通知tries上定义属性或retryUntil方法

最大异常数

有时你可能希望指定一个任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的,则应该失败(而不是由release方法直接释放)。为此,你可以maxExceptions在任务类中定义一个属性:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class ProcessPodcast implements ShouldQueue
8{
9 /**
10 * The number of times the job may be attempted.
11 *
12 * @var int
13 */
14 public $tries = 25;
15 
16 /**
17 * The maximum number of unhandled exceptions to allow before failing.
18 *
19 * @var int
20 */
21 public $maxExceptions = 3;
22 
23 /**
24 * Execute the job.
25 */
26 public function handle(): void
27 {
28 Redis::throttle('key')->allow(10)->every(60)->then(function () {
29 // Lock obtained, process the podcast...
30 }, function () {
31 // Unable to obtain lock...
32 return $this->release(10);
33 });
34 }
35}

在此示例中,如果应用程序无法获取 Redis 锁,则作业将被释放十秒,并将继续重试最多 25 次。但是,如果作业抛出三个未处理的异常,则作业将失败。

暂停

通常,您大致知道队列任务预计需要多长时间。因此,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,则处理该任务的工作进程将退出并出现错误。通常,工作进程将由您服务器上配置的进程管理器自动重启。

--timeout可以使用Artisan 命令行上的开关指定作业可以运行的最大秒数:

1php artisan queue:work --timeout=30

如果作业不断超时并超出其最大尝试次数,则会被标记为失败。

您还可以在作业类本身上定义允许作业运行的最大秒数。如果在作业中指定了超时,它将优先于命令行中指定的任何超时:

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of seconds the job can run before timing out.
9 *
10 * @var int
11 */
12 public $timeout = 120;
13}

有时,IO 阻塞进程(例如套接字或传出 HTTP 连接)可能不会遵守您指定的超时时间。因此,在使用这些功能时,您也应始终尝试使用其 API 指定超时时间。例如,在使用 Guzzle 时,您应始终指定连接和请求的超时值。

必须安装 PHP扩展pcntl才能指定作业超时时间。此外,作业的“超时”值应始终小于其“重试间隔”值。否则,作业可能会在实际执行完成或超时之前重新尝试执行。

超时失败

如果您想表明某项作业在超时时应被标​​记为失败$failOnTimeout,您可以在作业类上定义属性:

1/**
2 * Indicate if the job should be marked as failed on timeout.
3 *
4 * @var bool
5 */
6public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,该作业将自动释放回队列,以便再次尝试执行。作业将持续被释放,直到尝试次数达到应用程序允许的最大次数。最大尝试次数由Artisan 命令--tries中使用的开关定义queue:work。或者,也可以在作业类本身中定义最大尝试次数。有关运行队列工作器的更多信息,请参见下文

手动释放作业

有时你可能希望手动将任务释放回队列,以便稍后再次尝试执行。你可以调用如下release方法实现:

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 $this->release();
9}

默认情况下,该方法会将作业释放回队列以便立即处理。但是,你可以将整数或日期实例传递给该方法,release以指示队列在指定的秒数过去后才将作业释放回队列进行处理:release

1$this->release(10);
2 
3$this->release(now()->addSeconds(10));

手动使作业失败

有时你可能需要手动将作业标记为“失败”。为此,你可以调用该fail方法:

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 // ...
7 
8 $this->fail();
9}

如果您希望将捕获到的异常标记为失败,可以将异常传递给该fail方法。或者,为了方便起见,您可以传递一个字符串错误消息,它会被转换为异常:

1$this->fail($exception);
2 
3$this->fail('Something went wrong.');

有关失败作业的更多信息,请查看处理作业失败的文档

作业批处理

Laravel 的作业批处理功能允许您轻松地执行一批作业,并在作业完成时执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表,该表将包含有关作业批次的元信息,例如其完成百分比。此迁移可以使用make:queue-batches-tableArtisan 命令生成:

1php artisan make:queue-batches-table
2 
3php artisan migrate

定义可批处理作业

要定义可批处理作业,您应该像平常一样创建一个可排队作业Illuminate\Bus\Batchable;但是,您应该将trait 添加到作业类中。此 trait 提供对batch方法的访问,该方法可用于检索作业正在执行的当前批次:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Bus\Batchable;
6use Illuminate\Contracts\Queue\ShouldQueue;
7use Illuminate\Foundation\Queue\Queueable;
8 
9class ImportCsv implements ShouldQueue
10{
11 use Batchable, Queueable;
12 
13 /**
14 * Execute the job.
15 */
16 public function handle(): void
17 {
18 if ($this->batch()->cancelled()) {
19 // Determine if the batch has been cancelled...
20 
21 return;
22 }
23 
24 // Import a portion of the CSV file...
25 }
26}

调度批次

要分派一批作业,您应该使用外观层batch的方法Bus。当然,批处理主要与完成回调结合使用。因此,您可以使用thencatchfinally方法来定义批处理的完成回调。每个回调Illuminate\Bus\Batch在调用时都会收到一个实例。在此示例中,我们假设我们正在排队一批作业,每个作业处理来自 CSV 文件的给定数量的行:

1use App\Jobs\ImportCsv;
2use Illuminate\Bus\Batch;
3use Illuminate\Support\Facades\Bus;
4use Throwable;
5 
6$batch = Bus::batch([
7 new ImportCsv(1, 100),
8 new ImportCsv(101, 200),
9 new ImportCsv(201, 300),
10 new ImportCsv(301, 400),
11 new ImportCsv(401, 500),
12])->before(function (Batch $batch) {
13 // The batch has been created but no jobs have been added...
14})->progress(function (Batch $batch) {
15 // A single job has completed successfully...
16})->then(function (Batch $batch) {
17 // All jobs completed successfully...
18})->catch(function (Batch $batch, Throwable $e) {
19 // First batch job failure detected...
20})->finally(function (Batch $batch) {
21 // The batch has finished executing...
22})->dispatch();
23 
24return $batch->id;

批次的 ID 可以通过属性访问$batch->id,可用于在批次调度后查询 Laravel 命令总线以获取有关该批次的信息。

由于批处理回调会被 Laravel 队列序列化并稍后执行,因此不应$this在回调中使用该变量。此外,由于批处理作业被包装在数据库事务中,因此触发隐式提交的数据库语句不应在作业中执行。

命名批次

某些工具(例如 Laravel Horizo​​n 和 Laravel Telescope)如果为批次指定了名称,则可以提供更用户友好的调试信息。要为批次分配任意名称,可以name在定义批次时调用该方法:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->name('Import CSV')->dispatch();

批量连接和队列

如果您想指定用于批处理作业的连接和队列,可以使用onConnectiononQueue方法。所有批处理作业必须在同一连接和队列中执行:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->onConnection('redis')->onQueue('imports')->dispatch();

连锁和批次

您可以通过将链接的作业放入数组中来定义批次中的一组链接作业。例如,我们可以并行执行两个作业链,并在两个作业链都完成处理后执行回调:

1use App\Jobs\ReleasePodcast;
2use App\Jobs\SendPodcastReleaseNotification;
3use Illuminate\Bus\Batch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::batch([
7 [
8 new ReleasePodcast(1),
9 new SendPodcastReleaseNotification(1),
10 ],
11 [
12 new ReleasePodcast(2),
13 new SendPodcastReleaseNotification(2),
14 ],
15])->then(function (Batch $batch) {
16 // ...
17})->dispatch();

相反,您可以通过在链中定义批次来在链中运行批量作业。例如,您可以先运行一批作业来发布多个播客,然后再运行一批作业来发送发布通知:

1use App\Jobs\FlushPodcastCache;
2use App\Jobs\ReleasePodcast;
3use App\Jobs\SendPodcastReleaseNotification;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new FlushPodcastCache,
8 Bus::batch([
9 new ReleasePodcast(1),
10 new ReleasePodcast(2),
11 ]),
12 Bus::batch([
13 new SendPodcastReleaseNotification(1),
14 new SendPodcastReleaseNotification(2),
15 ]),
16])->dispatch();

将作业添加到批次

有时,在批处理作业中添加额外的作业可能会很有用。当你需要批量处理数千个作业,而这些作业在 Web 请求期间可能需要很长时间才能分发时,这种模式非常有用。因此,你或许希望分发一个初始批次的“加载器”作业,以便将更多作业添加到批处理中:

1$batch = Bus::batch([
2 new LoadImportBatch,
3 new LoadImportBatch,
4 new LoadImportBatch,
5])->then(function (Batch $batch) {
6 // All jobs completed successfully...
7})->name('Import Contacts')->dispatch();

在此示例中,我们将使用LoadImportBatch作业将批处理与其他作业合并。为此,我们可以add在批处理实例上使用可通过作业batch方法访问的方法:

1use App\Jobs\ImportContacts;
2use Illuminate\Support\Collection;
3 
4/**
5 * Execute the job.
6 */
7public function handle(): void
8{
9 if ($this->batch()->cancelled()) {
10 return;
11 }
12 
13 $this->batch()->add(Collection::times(1000, function () {
14 return new ImportContacts;
15 }));
16}

您只能将属于同一批次的作业添加到批次中。

检验批次

提供给批次完成回调的实例Illuminate\Bus\Batch具有多种属性和方法来帮助您与给定的一批作业进行交互和检查:

1// The UUID of the batch...
2$batch->id;
3 
4// The name of the batch (if applicable)...
5$batch->name;
6 
7// The number of jobs assigned to the batch...
8$batch->totalJobs;
9 
10// The number of jobs that have not been processed by the queue...
11$batch->pendingJobs;
12 
13// The number of jobs that have failed...
14$batch->failedJobs;
15 
16// The number of jobs that have been processed thus far...
17$batch->processedJobs();
18 
19// The completion percentage of the batch (0-100)...
20$batch->progress();
21 
22// Indicates if the batch has finished executing...
23$batch->finished();
24 
25// Cancel the execution of the batch...
26$batch->cancel();
27 
28// Indicates if the batch has been cancelled...
29$batch->cancelled();

从路线返回批次

所有Illuminate\Bus\Batch实例均支持 JSON 序列化,这意味着您可以直接从应用程序的某个路由返回它们,以检索包含批次信息(包括其完成进度)的 JSON 负载。这样可以方便地在应用程序的 UI 中显示批次完成进度的信息。

要通过 ID 检索批次,您可以使用Bus外观的findBatch方法:

1use Illuminate\Support\Facades\Bus;
2use Illuminate\Support\Facades\Route;
3 
4Route::get('/batch/{batchId}', function (string $batchId) {
5 return Bus::findBatch($batchId);
6});

取消批次

有时你可能需要取消某个批次的执行。这可以通过调用实例cancel上的方法来实现Illuminate\Bus\Batch

1/**
2 * Execute the job.
3 */
4public function handle(): void
5{
6 if ($this->user->exceedsImportLimit()) {
7 return $this->batch()->cancel();
8 }
9 
10 if ($this->batch()->cancelled()) {
11 return;
12 }
13}

你可能已经注意到,在前面的例子中,批量作业通常应该在继续执行之前判断其对应的批次是否已被取消。但是,为了方便起见,你可以将SkipIfBatchCancelled 中间件分配给作业。顾名思义,如果对应的批次已被取消,这个中间件将指示 Laravel 停止处理该作业:

1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
2 
3/**
4 * Get the middleware the job should pass through.
5 */
6public function middleware(): array
7{
8 return [new SkipIfBatchCancelled];
9}

批次失败

当批量作业失败时,catch回调函数(如果已指定)将被调用。此回调函数仅在批量作业中第一个失败的作业时调用。

允许失败

当批次中的作业失败时,Laravel 会自动将该批次标记为“已取消”。如果您愿意,可以禁用此行为,这样作业失败时就不会自动将批次标记为“已取消”。这可以通过allowFailures在调度批次时调用该方法来实现:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个queue:retry-batchArtisan 命令,可让您轻松重试给定批次中所有失败的作业。该queue:retry-batch命令接受需要重试的失败作业的批次的 UUID:

1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批次

如果不进行修剪,job_batches表会很快积累记录。为了缓解这种情况,您应该将Artisan命令安排queue:prune-batches为每天运行:

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批次都将被删除。您可以hours在调用命令时使用该选项来确定批次数据的保留时间。例如,以下命令将删除所有超过 48 小时前完成的批次:

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48')->daily();

有时,您的jobs_batches表可能会累积从未成功完成的批次记录,例如作业失败且从未成功重试的批次。您可以queue:prune-batches使用以下选项指示命令修剪这些未完成的批次记录unfinished

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的jobs_batches表也可能累积已取消批次的批次记录。您可以queue:prune-batches使用以下选项指示命令删除这些已取消的批次记录cancelled

1use Illuminate\Support\Facades\Schedule;
2 
3Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批次

Laravel 还支持将批次元信息存储在DynamoDB中,而不是关系数据库中。但是,您需要手动创建 DynamoDB 表来存储所有批次记录。

通常,此表应命名为job_batches,但您应该根据queue.batching.table应用程序queue配置文件中的配置值来命名表。

DynamoDB 批处理表配置

job_batches表应具有一个名为 的字符串主分区键application和一个名为 的字符串主排序键id。键的一部分将包含应用程序的名称,该名称由应用程序配置文件中的配置值application定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一张表来存储多个 Laravel 应用程序的作业批次。nameapp

此外,ttl如果您想利用自动批量修剪,您可以为您的表定义属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

1composer require aws/aws-sdk-php

然后,将queue.batching.driver配置选项的值设置为dynamodb。此外,您还应该在配置数组中定义keysecretregion配置选项batching。这些选项将用于向 AWS 进行身份验证。使用dynamodb驱动程序时,queue.batching.database不需要配置选项:

1'batching' => [
2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'job_batches',
7],

DynamoDB 中的批次修剪

使用DynamoDB存储作业批次信息时,用于修剪存储在关系数据库中的批次的典型修剪命令将不起作用。您可以利用DynamoDB 的原生 TTL 功能自动删除旧批次的记录。

如果您使用属性定义了 DynamoDB 表ttl,则可以定义配置参数来指示 Laravel 如何修剪批量记录。queue.batching.ttl_attribute配置值定义了保存 TTL 的属性的名称,而queue.batching.ttl配置值定义了相对于上次更新记录的时间,在多少秒后可以从 DynamoDB 表中删除该批量记录:

1'batching' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'job_batches',
7 'ttl_attribute' => 'ttl',
8 'ttl' => 60 * 60 * 24 * 7, // 7 days...
9],

队列闭包

除了将任务类分派到队列之外,您还可以分派闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。将闭包分派到队列时,闭包的代码内容会进行加密签名,以确保其在传输过程中无法被修改:

1$podcast = App\Podcast::find(1);
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5});

要为排队闭包分配一个名称,该名称可供队列报告仪表板使用,并可通过queue:work命令显示,您可以使用该name方法:

1dispatch(function () {
2 // ...
3})->name('Publish Podcast');

使用该catch方法,您可以提供一个闭包,如果在用尽队列的所有配置的重试尝试后排队的闭包仍未能成功完成,则应执行该闭包:

1use Throwable;
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5})->catch(function (Throwable $e) {
6 // This job has failed...
7});

由于catch回调由 Laravel 队列序列化并在稍后执行,因此您不应在回调$this中使用该变量catch

运行队列工作器

命令queue:work

Laravel 包含一个 Artisan 命令,它可以启动一个队列工作器,并在新任务推送到队列时进行处理。您可以使用queue:workArtisan 命令运行该工作器。请注意,一旦命令queue:work启动,它将持续运行,直到手动停止或关闭终端:

1php artisan queue:work

为了使queue:work进程在后台永久运行,您应该使用进程监视器(例如Supervisor)来确保队列工作器不会停止运行。

如果您希望将已处理的作业 ID 包含在命令的输出中,则可以-v在调用命令时包含标志:queue:work

1php artisan queue:work -v

请记住,队列工作器是长寿命进程,并将启动后的应用程序状态存储在内存中。因此,它们启动后不会感知到代码库的更改。因此,在部署过程中,请务必重新启动队列工作器。此外,请记住,应用程序创建或修改的任何静态状态都不会在作业之间自动重置。

或者,您可以运行该queue:listen命令。使用该queue:listen命令时,当您想要重新加载更新的代码或重置应用程序状态时,无需手动重启工作器;但是,此命令的效率明显低于以下queue:work命令:

1php artisan queue:listen

运行多个队列工作者

要将多个工作进程分配到队列并同时处理作业,只需启动多个queue:work进程即可。这可以通过终端中的多个选项卡在本地完成,也可以在生产环境中使用进程管理器的配置设置完成。使用 Supervisor 时,可以使用numprocs配置值。

指定连接和队列

您还可以指定工作进程应使用的队列连接。传递给work命令的连接名称应与配置文件中定义的连接之一相对应config/queue.php

1php artisan queue:work redis

默认情况下,该queue:work命令仅处理给定连接上默认队列的任务。但是,您可以进一步自定义队列工作器,使其仅处理给定连接的特定队列。例如,如果您的所有电子邮件都在队列连接emails上的某个队列中处理redis,则可以发出以下命令来启动一个仅处理该队列的工作器:

1php artisan queue:work redis --queue=emails

处理指定数量的作业

--once选项可用于指示工作者仅处理队列中的单个作业:

1php artisan queue:work --once

--max-jobs选项可用于指示工作进程处理指定数量的作业,然后退出。此选项与Supervisor结合使用时非常有用,这样您的工作进程在处理完指定数量的作业后会自动重新启动,并释放它们可能累积的内存:

1php artisan queue:work --max-jobs=1000

处理所有排队作业然后退出

--stop-when-empty选项可用于指示工作进程处理所有任务,然后正常退出。如果您希望在队列为空后关闭容器,则在处理 Docker 容器中的 Laravel 队列时此选项非常有用:

1php artisan queue:work --stop-when-empty

在给定的秒数内处理作业

--max-time选项可用于指示工作进程在指定的秒数内处理作业,然后退出。此选项与Supervisor结合使用时非常有用,这样您的工作进程在处理作业指定时间后会自动重新启动,并释放它们可能累积的内存:

1# Process jobs for one hour and then exit...
2php artisan queue:work --max-time=3600

工人睡眠时长

当队列中有可用作业时,工作器会持续处理作业,作业之间不会有任何延迟。但是,该sleep选项决定了当队列中没有可用作业时,工作器将“休眠”多少秒。当然,休眠期间,工作器不会处理任何新作业:

1php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于维护模式时,队列中的任务将不会被处理。应用程序退出维护模式后,任务将继续照常处理。

要强制队列工作者即使在启用了维护模式的情况下也处理作业,您可以使用--force选项:

1php artisan queue:work --force

资源考虑

守护队列工作器在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放所有占用大量资源的资源。例如,如果您使用 GD 库进行图像处理,则应在imagedestroy图像处理完成后释放内存。

队列优先级

有时你可能希望设置队列处理的优先级。例如,在config/queue.php配置文件中,你可能将连接的默认连接设置queueredislow然而,有时你可能希望将任务推送到high优先级队列,如下所示:

1dispatch((new Job)->onQueue('high'));

high要启动一个工作程序,在继续处理队列中的任何作业之前验证所有队列作业都已处理完毕low,请将以逗号分隔的队列名称列表传递给work命令:

1php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作进程是长生命周期进程,因此它们不会在不重启的情况下感知到代码的更改。因此,部署使用队列工作进程的应用程序最简单的方法是在部署过程中重启工作进程。您可以通过以下queue:restart命令优雅地重启所有工作进程:

1php artisan queue:restart

此命令将指示所有队列工作进程在完成当前任务后优雅退出,以免现有任务丢失。由于队列工作进程会在queue:restart执行此命令时退出,因此您应该运行一个进程管理器(例如Supervisor)来自动重启队列工作进程。

队列使用缓存来存储重启信号,因此在使用此功能之前,您应该验证应用程序的缓存驱动程序是否已正确配置。

作业到期和超时

职位到期

在您的配置文件中config/queue.php,每个队列连接都会定义一个retry_after选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果retry_after将 的值设置为90,则如果作业已处理 90 秒而未被释放或删除,则会将其释放回队列。通常,您应该将该retry_after值设置为作业完成处理所需的合理最大秒数。

唯一不包含retry_after值的队列连接是 Amazon SQS。SQS 将根据AWS 控制台中管理的默认可见性超时值重试该作业。

工作进程超时

Artisanqueue:work命令提供了一个--timeout选项。默认情况下,该--timeout值为 60 秒。如果作业的处理时间超过了超时值指定的秒数,则处理该作业的工作进程将退出并出现错误。通常,工作进程将由服务器上配置的进程管理器自动重启:

1php artisan queue:work --timeout=60

配置选项retry_after--timeoutCLI 选项不同,但协同工作以确保作业不会丢失并且作业仅成功处理一次。

--timeout值应始终比您的配置值至少短几秒retry_after。这将确保处理冻结作业的工作进程在重试作业之前始终终止。如果您的--timeout选项比您的配置值长retry_after,则您的作业可能会被处理两次。

主管配置

在生产环境中,您需要一种方法来保持queue:work进程的运行。queue:work进程可能由于各种原因停止运行,例如超过工作进程超时或执行queue:restart命令。

因此,您需要配置一个进程监视器,它可以检测queue:work进程何时退出并自动重启它们。此外,进程监视器还允许您指定queue:work要并发运行的进程数量。Supervisor 是 Linux 环境中常用的进程监视器,我们将在后续文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,queue:work如果进程失败,它将自动重启。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:

1sudo apt-get install supervisor

如果您觉得自行配置和管理 Supervisor 太麻烦,可以考虑使用Laravel Cloud,它为运行 Laravel 队列工作者提供了一个完全托管的平台。

配置主管

Supervisor 的配置文件通常存储在该/etc/supervisor/conf.d目录中。在这个目录中,你可以创建任意数量的配置文件,用于指示 Supervisor 如何监控你的进程。例如,让我们创建一个laravel-worker.conf启动并监控queue:work进程的文件:

1[program:laravel-worker]
2process_name=%(program_name)s_%(process_num)02d
3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
4autostart=true
5autorestart=true
6stopasgroup=true
7killasgroup=true
8user=forge
9numprocs=8
10redirect_stderr=true
11stdout_logfile=/home/forge/app.com/worker.log
12stopwaitsecs=3600

在此示例中,该numprocs指令将指示 Supervisor 运行queue:work并监视所有八个进程,如果它们失败则自动重新启动它们。您应该更改command配置指令以反映所需的队列连接和工作器选项。

您应该确保 的值stopwaitsecs大于运行时间最长的作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其终止。

启动主管

一旦创建了配置文件,您就可以使用以下命令更新 Supervisor 配置并启动进程:

1sudo supervisorctl reread
2 
3sudo supervisorctl update
4 
5sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请查阅Supervisor 文档

处理失败的任务

有时你的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种便捷的方式来指定任务的最大尝试次数。异步任务超过此尝试次数后,将被插入到failed_jobs数据库表中。失败的同步调度任务不会存储在此表中,它们的异常会立即由应用程序处理。

用于创建该failed_jobs表的迁移文件通常已存在于新的 Laravel 应用程序中。但是,如果您的应用程序不包含该表的迁移文件,则可以使用以下make:queue-failed-table命令创建迁移文件:

1php artisan make:queue-failed-table
2 
3php artisan migrate

运行队列工作--tries进程时,可以使用命令中的开关指定任务尝试执行的最大次数queue:work。如果没有指定该--tries选项的值,则任务将仅尝试执行一次,或者尝试执行任务类属性指定的次数$tries

1php artisan queue:work redis --tries=3

使用该--backoff选项,您可以指定 Laravel 在重试遇到异常的作业之前应等待的秒数。默认情况下,作业会立即释放回队列,以便再次尝试:

1php artisan queue:work redis --tries=3 --backoff=3

如果您想要配置 Laravel 在重试每个作业遇到异常之前应等待的秒数,您可以通过backoff在作业类上定义一个属性来实现:

1/**
2 * The number of seconds to wait before retrying the job.
3 *
4 * @var int
5 */
6public $backoff = 3;

如果您需要更复杂的逻辑来确定作业的退避时间,您可以backoff在作业类上定义一个方法:

1/**
2 * Calculate the number of seconds to wait before retrying the job.
3 */
4public function backoff(): int
5{
6 return 3;
7}

您可以通过从该方法返回一个退避值数组来轻松配置“指数”退避backoff。在此示例中,第一次重试的延迟为 1 秒,第二次重试的延迟为 5 秒,第三次重试的延迟为 10 秒,如果剩余尝试次数较多,则后续每次重试的延迟均为 10 秒:

1/**
2 * Calculate the number of seconds to wait before retrying the job.
3 *
4 * @return array<int, int>
5 */
6public function backoff(): array
7{
8 return [1, 5, 10];
9}

作业失败后的清理

当某个作业失败时,您可能希望向用户发送警报,或撤销该作业未完成的操作。为此,您可以failed在作业类中定义一个方法。Throwable导致作业失败的实例将被传递给该failed方法:

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Queue\Queueable;
9use Throwable;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Queueable;
14 
15 /**
16 * Create a new job instance.
17 */
18 public function __construct(
19 public Podcast $podcast,
20 ) {}
21 
22 /**
23 * Execute the job.
24 */
25 public function handle(AudioProcessor $processor): void
26 {
27 // Process uploaded podcast...
28 }
29 
30 /**
31 * Handle a job failure.
32 */
33 public function failed(?Throwable $exception): void
34 {
35 // Send user notification of failure, etc...
36 }
37}

在调用该方法之前,会实例化该作业的一个新实例failed;因此,该方法内可能发生的任何类属性修改handle都将丢失。

重试失败的作业

要查看已插入failed_jobs数据库表的所有失败作业,您可以使用queue:failedArtisan 命令:

1php artisan queue:failed

queue:failed命令将列出作业 ID、连接、队列、失败时间以及其他相关信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 的失败作业ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece,请执行以下命令:

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如果需要,你可以向命令传递多个 ID:

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败的作业:

1php artisan queue:retry --queue=name

要重试所有失败的作业,请执行queue:retry命令并传递allID:

1php artisan queue:retry all

如果您想删除失败的作业,您可以使用以下queue:forget命令:

1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

当使用Horizo​​n时,应该使用horizon:forget命令来删除失败的作业,而不是使用queue:forget命令。

要从表中删除所有失败的作业failed_jobs,您可以使用以下queue:flush命令:

1php artisan queue:flush

忽略缺失的模型

将 Eloquent 模型注入作业时,该模型会自动序列化,然后放入队列,并在作业处理时从数据库重新检索。但是,如果在作业等待工作器处理时模型被删除,则作业可能会失败并出现ModelNotFoundException.

为了方便起见,你可以将作业的deleteWhenMissingModels属性设置为,以自动删除缺少模型的作业true。当此属性设置为 时true,Laravel 会悄悄地丢弃该作业,而不会引发异常:

1/**
2 * Delete the job if its models no longer exist.
3 *
4 * @var bool
5 */
6public $deleteWhenMissingModels = true;

修剪失败的作业

failed_jobs您可以通过调用Artisan 命令来修剪应用程序表中的记录queue:prune-failed

1php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败作业记录都会被删除。如果您--hours在命令中指定了该选项,则只会保留过去 N 小时内插入的失败作业记录。例如,以下命令将删除所有超过 48 小时前插入的失败作业记录:

1php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还支持将失败的作业记录存储在DynamoDB中,而不是关系数据库表中。但是,您必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为,但您应该根据应用程序配置文件中的配置值failed_jobs来命名表queue.failed.tablequeue

failed_jobs表应具有一个名为 的字符串主分区键application和一个名为 的字符串主排序键uuid。键的一部分将包含应用程序的名称,该名称由应用程序配置文件中的配置值application定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一张表来存储多个 Laravel 应用程序的失败作业。nameapp

此外,请确保安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

1composer require aws/aws-sdk-php

接下来,将queue.failed.driver配置选项的值设置为dynamodb。此外,您还应该在失败作业配置数组中定义keysecretregion配置选项。这些选项将用于向 AWS 进行身份验证。使用dynamodb驱动程序时,queue.failed.database不需要该配置选项:

1'failed' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'failed_jobs',
7],

禁用失败作业存储

queue.failed.driver你可以通过将配置选项的值设置为 来指示 Laravel 丢弃失败的作业而不存储它们null。通常,这可以通过QUEUE_FAILED_DRIVER环境变量来实现:

1QUEUE_FAILED_DRIVER=null

失败作业事件

如果你想注册一个事件监听器,在任务失败时调用,可以使用QueueFacade 的failing方法。例如,我们可以在Laravel 自带的boot方法中为该事件附加一个闭包:AppServiceProvider

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobFailed;
8 
9class AppServiceProvider extends ServiceProvider
10{
11 /**
12 * Register any application services.
13 */
14 public function register(): void
15 {
16 // ...
17 }
18 
19 /**
20 * Bootstrap any application services.
21 */
22 public function boot(): void
23 {
24 Queue::failing(function (JobFailed $event) {
25 // $event->connectionName
26 // $event->job
27 // $event->exception
28 });
29 }
30}

清除队列中的作业

当使用Horizo​​n时,应该使用horizon:clear命令从队列中清除作业,而不是使用queue:clear命令。

如果您想要从默认连接的默认队列中删除所有作业,您可以使用queue:clearArtisan 命令来执行此操作:

1php artisan queue:clear

您还可以提供connection参数和queue选项来从特定连接和队列中删除作业:

1php artisan queue:clear redis --queue=emails

清除队列中的作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。

监控您的队列

如果您的队列突然收到大量任务,它可能会不堪重负,导致任务完成需要很长时间。如果您需要,Laravel 可以在队列任务数量超过指定阈值时发出警报。

首先,您应该将queue:monitor命令设置为每分钟运行一次。该命令接受您希望监控的队列名称以及所需的作业计数阈值:

1php artisan queue:monitor redis:default,redis:deployments --max=100

仅调度此命令不足以触发通知,提醒您队列已超负荷。当此命令遇到作业数量超过阈值的队列时,Illuminate\Queue\Events\QueueBusy将会调度一个事件。您可以在应用程序中监听此事件,AppServiceProvider以便向您或您的开发团队发送通知:

1use App\Notifications\QueueHasLongWaitTime;
2use Illuminate\Queue\Events\QueueBusy;
3use Illuminate\Support\Facades\Event;
4use Illuminate\Support\Facades\Notification;
5 
6/**
7 * Bootstrap any application services.
8 */
9public function boot(): void
10{
11 Event::listen(function (QueueBusy $event) {
12 Notification::route('mail', 'dev@example.com')
13 ->notify(new QueueHasLongWaitTime(
14 $event->connection,
15 $event->queue,
16 $event->size
17 ));
18 });
19}

测试

在测试调度任务的代码时,您可能希望指示 Laravel 不实际执行任务本身,因为任务的代码可以直接与调度任务的代码分开测试。当然,要测试任务本身,您可以实例化一个任务实例,并handle在测试中直接调用该方法。

你可以使用Queue外观fake方法阻止队列中的任务被推送到队列。调用Queue外观fake方法后,你可以断言应用程序已尝试将任务推送到队列:

1<?php
2 
3use App\Jobs\AnotherJob;
4use App\Jobs\FinalJob;
5use App\Jobs\ShipOrder;
6use Illuminate\Support\Facades\Queue;
7 
8test('orders can be shipped', function () {
9 Queue::fake();
10 
11 // Perform order shipping...
12 
13 // Assert that no jobs were pushed...
14 Queue::assertNothingPushed();
15 
16 // Assert a job was pushed to a given queue...
17 Queue::assertPushedOn('queue-name', ShipOrder::class);
18 
19 // Assert a job was pushed twice...
20 Queue::assertPushed(ShipOrder::class, 2);
21 
22 // Assert a job was not pushed...
23 Queue::assertNotPushed(AnotherJob::class);
24 
25 // Assert that a Closure was pushed to the queue...
26 Queue::assertClosurePushed();
27 
28 // Assert the total number of jobs that were pushed...
29 Queue::assertCount(3);
30});
1<?php
2 
3namespace Tests\Feature;
4 
5use App\Jobs\AnotherJob;
6use App\Jobs\FinalJob;
7use App\Jobs\ShipOrder;
8use Illuminate\Support\Facades\Queue;
9use Tests\TestCase;
10 
11class ExampleTest extends TestCase
12{
13 public function test_orders_can_be_shipped(): void
14 {
15 Queue::fake();
16 
17 // Perform order shipping...
18 
19 // Assert that no jobs were pushed...
20 Queue::assertNothingPushed();
21 
22 // Assert a job was pushed to a given queue...
23 Queue::assertPushedOn('queue-name', ShipOrder::class);
24 
25 // Assert a job was pushed twice...
26 Queue::assertPushed(ShipOrder::class, 2);
27 
28 // Assert a job was not pushed...
29 Queue::assertNotPushed(AnotherJob::class);
30 
31 // Assert that a Closure was pushed to the queue...
32 Queue::assertClosurePushed();
33 
34 // Assert the total number of jobs that were pushed...
35 Queue::assertCount(3);
36 }
37}

你可以将闭包传递给assertPushedassertNotPushed方法,以断言推送的作业是否通过了指定的“真值测试”。如果至少有一个作业通过了指定的真值测试,则断言成功:

1Queue::assertPushed(function (ShipOrder $job) use ($order) {
2 return $job->order->id === $order->id;
3});

伪造一部分工作

如果您只需要伪造特定的作业,同时允许其他作业正常执行,您可以将需要伪造的作业的类名传递给该fake方法:

1test('orders can be shipped', function () {
2 Queue::fake([
3 ShipOrder::class,
4 ]);
5 
6 // Perform order shipping...
7 
8 // Assert a job was pushed twice...
9 Queue::assertPushed(ShipOrder::class, 2);
10});
1public function test_orders_can_be_shipped(): void
2{
3 Queue::fake([
4 ShipOrder::class,
5 ]);
6 
7 // Perform order shipping...
8 
9 // Assert a job was pushed twice...
10 Queue::assertPushed(ShipOrder::class, 2);
11}

您可以使用下列方法伪造除一组指定作业之外的所有作业except

1Queue::fake()->except([
2 ShipOrder::class,
3]);

测试作业链

要测试作业链,您需要利用BusFacade 的伪造功能。FacadeBusassertChained方法可用于断言已调度作业链assertChained。该方法接受一个包含作业链的数组作为其第一个参数:

1use App\Jobs\RecordShipment;
2use App\Jobs\ShipOrder;
3use App\Jobs\UpdateInventory;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::fake();
7 
8// ...
9 
10Bus::assertChained([
11 ShipOrder::class,
12 RecordShipment::class,
13 UpdateInventory::class
14]);

如上例所示,链接任务的数组可以是任务类名的数组。但是,你也可以提供一个实际任务实例的数组。这样做时,Laravel 会确保任务实例与应用程序调度的链接任务属于同一类,并具有相同的属性值:

1Bus::assertChained([
2 new ShipOrder,
3 new RecordShipment,
4 new UpdateInventory,
5]);

您可以使用该assertDispatchedWithoutChain方法来断言某项作业是在没有作业链的情况下被推送的:

1Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果一个链式作业将作业添加到现有链的前面或后面,则可以使用该作业的assertHasChain方法来断言该作业具有预期的剩余作业链:

1$job = new ProcessPodcast;
2 
3$job->handle();
4 
5$job->assertHasChain([
6 new TranscribePodcast,
7 new OptimizePodcast,
8 new ReleasePodcast,
9]);

assertDoesntHaveChain方法可用于断言作业的剩余链为空:

1$job->assertDoesntHaveChain();

测试链式批次

如果您的作业链包含一批作业Bus::chainedBatch,您可以通过在链断言中插入定义来断言链接的批次符合您的期望:

1use App\Jobs\ShipOrder;
2use App\Jobs\UpdateInventory;
3use Illuminate\Bus\PendingBatch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::assertChained([
7 new ShipOrder,
8 Bus::chainedBatch(function (PendingBatch $batch) {
9 return $batch->jobs->count() === 3;
10 }),
11 new UpdateInventory,
12]);

测试作业批次

外观BusassertBatched方法可用于断言一批作业已被调度。传递给该assertBatched方法的闭包接收一个 实例Illuminate\Bus\PendingBatch,该实例可用于检查批次中的作业:

1use Illuminate\Bus\PendingBatch;
2use Illuminate\Support\Facades\Bus;
3 
4Bus::fake();
5 
6// ...
7 
8Bus::assertBatched(function (PendingBatch $batch) {
9 return $batch->name == 'import-csv' &&
10 $batch->jobs->count() === 10;
11});

您可以使用该assertBatchCount方法来断言已调度给定数量的批次:

1Bus::assertBatchCount(3);

您可以使用assertNothingBatched以下命令断言没有发送任何批次:

1Bus::assertNothingBatched();

测试作业/批次交互

此外,你偶尔可能需要测试单个作业与其底层批次的交互。例如,你可能需要测试某个作业是否取消了其批次的进一步处理。为此,你需要通过 该withFakeBatch方法为该作业分配一个虚拟批次。该withFakeBatch方法返回一个包含作业实例和虚拟批次的元组:

1[$job, $batch] = (new ShipOrder)->withFakeBatch();
2 
3$job->handle();
4 
5$this->assertTrue($batch->cancelled());
6$this->assertEmpty($batch->added);

测试作业/队列交互

有时,您可能需要测试排队的作业是否将自身释放回队列。或者,您可能需要测试作业是否自行删除。您可以通过实例化作业并调用该withFakeQueueInteractions方法来测试这些队列交互。

一旦伪造了作业的队列交互,您就可以handle在作业上调用该方法。调用作业后,可以使用assertReleasedassertDeletedassertNotDeletedassertFailedassertFailedWithassertNotFailed方法对作业的队列交互进行断言:

1use App\Exceptions\CorruptedAudioException;
2use App\Jobs\ProcessPodcast;
3 
4$job = (new ProcessPodcast)->withFakeQueueInteractions();
5 
6$job->handle();
7 
8$job->assertReleased(delay: 30);
9$job->assertDeleted();
10$job->assertNotDeleted();
11$job->assertFailed();
12$job->assertFailedWith(CorruptedAudioException::class);
13$job->assertNotFailed();

作业事件

使用Facade上的before和方法,您可以指定在队列任务处理之前或之后执行的回调。这些回调非常适合为仪表板执行额外的日志记录或增量统计。通常,您应该从服务提供者的方法中调用这些方法。例如,我们可以使用Laravel 内置的 :afterQueue bootAppServiceProvider

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobProcessed;
8use Illuminate\Queue\Events\JobProcessing;
9 
10class AppServiceProvider extends ServiceProvider
11{
12 /**
13 * Register any application services.
14 */
15 public function register(): void
16 {
17 // ...
18 }
19 
20 /**
21 * Bootstrap any application services.
22 */
23 public function boot(): void
24 {
25 Queue::before(function (JobProcessing $event) {
26 // $event->connectionName
27 // $event->job
28 // $event->job->payload()
29 });
30 
31 Queue::after(function (JobProcessed $event) {
32 // $event->connectionName
33 // $event->job
34 // $event->job->payload()
35 });
36 }
37}

使用Facadelooping上的方法,你可以指定在工作进程尝试从队列中获取任务之前执行的回调。例如,你可以注册一个闭包来回滚之前失败的任务所留下的任何事务:Queue

1use Illuminate\Support\Facades\DB;
2use Illuminate\Support\Facades\Queue;
3 
4Queue::looping(function () {
5 while (DB::transactionLevel() > 0) {
6 DB::rollBack();
7 }
8});