在构建 Web 应用程序时,你可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行耗时过长。幸运的是,Laravel 允许你轻松创建可在后台处理的队列作业。通过将耗时任务移至队列,你的应用程序能够以极快的速度响应 Web 请求,并为你的客户提供更好的用户体验。
Laravel 队列提供统一的队列 API,可跨多种不同的队列后端,例如 Amazon SQS, Redis, 甚至关系型数据库。
Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,包括 database、Amazon SQS、Redis 和 Beanstalkd 驱动程序,以及一个将立即执行任务的同步驱动程序(用于开发或测试)。此外还包含一个 null 队列驱动程序,它会丢弃排队的任务。
[!NOTE]
Laravel Horizon 是一个美观的仪表盘和配置系统,用于您的 Redis 驱动队列。请查看完整的 Horizon 文档 以了解更多信息。
在开始使用 Laravel 队列之前,了解“连接”与“队列”之间的区别非常重要。在你的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与 Amazon SQS、Beanstalk 或 Redis 等后端队列服务的连接。然而,任何给定的队列连接都可能包含多个“队列”,这些队列可以被视为不同的堆栈或待处理任务的集合。
请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是任务在被发送到给定连接时将被分派到的默认队列。换句话说,如果你分派一个任务时没有明确定义它应该被分派到哪个队列,该任务将被放置在连接配置的 queue 属性中定义的队列上:
use App\Jobs\ProcessPodcast;
// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();
// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');有些应用程序可能不需要将作业推送到多个队列,而是倾向于只使用一个简单的队列。然而,将作业推送到多个队列对于那些希望根据优先级或分段方式处理作业的应用程序来说特别有用,因为 Laravel 队列工作器允许你根据优先级指定它应该处理哪些队列。例如,如果你将作业推送到一个 high 队列,你可以运行一个工作器来赋予它们更高的处理优先级:
php artisan queue:work --queue=high,default为了使用 database 队列驱动,你需要一个数据库表来存放任务。通常,这包含在 Laravel 默认的 0001_01_01_000002_create_jobs_table.php 数据库迁移中;然而,如果你的应用程序不包含此迁移,你可以使用 make:queue-table Artisan 命令来创建它:
php artisan make:queue-table
php artisan migrate为了使用 redis 队列驱动器,你应该在你的 config/database.php 配置文件中配置一个 Redis 数据库连接。
[!WARNING]
此serializer和compressionRedis 选项不受redis队列驱动程序支持。
如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含 键哈希标签。这是必需的,以确保给定队列的所有 Redis 键都放置在相同的哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],当使用 Redis 队列时,你可以使用 block_for 配置选项来指定驱动程序在迭代工作进程循环并重新轮询 Redis 数据库之前,应该等待作业可用多长时间。
根据您的队列负载调整此值,可能比持续轮询 Redis 数据库以获取新作业更高效。例如,您可以将该值设置为 5,以指示驱动程序应阻塞五秒来等待作业可用:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],[!WARNING]
将block_for设置为0将导致队列工作器无限期地阻塞,直到有可用作业为止。这还将阻止诸如SIGTERM之类的信号被处理,直到下一个作业被处理。
以下依赖项是列出的队列驱动程序所需的。这些依赖项可以通过 Composer 包管理器进行安装:
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当你运行 make:job Artisan 命令时,它将被创建:
php artisan make:job ProcessPodcast生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 指示该任务应被推送到队列中以异步方式运行。
[!注意]
作业存根可以使用存根发布进行自定义.
作业类非常简单,通常只包含一个handle方法,当作业由队列处理时被调用。为了开始,让我们看一个作业类示例。在此示例中,我们将假设我们管理一个播客发布服务,并需要处理上传的播客文件才能发布:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}在此示例中,请注意我们能够传递一个 Eloquent 模型 直接到队列作业的构造函数中。由于该作业正在使用 Queueable trait,Eloquent 模型及其已加载的关系将在作业处理时被优雅地序列化和反序列化。
如果你的队列任务在其构造函数中接受一个 Eloquent 模型,那么只有该模型的标识符会被序列化到队列中。当任务实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其已加载的关系。这种模型序列化方法允许将更小的任务负载发送到你的队列驱动程序。
handle 方法依赖注入handle 方法会在任务被队列处理时调用。请注意,我们可以在任务的 handle 方法上对依赖项进行类型提示。Laravel 服务容器 会自动注入这些依赖项。
如果您希望完全控制容器如何将依赖项注入 handle 方法, 您可以使用容器的 bindMethod 方法. bindMethod 方法接受一个回调, 该回调会接收任务和容器. 在该回调中, 您可以按您希望的任何方式调用 handle 方法. 通常, 您应该在 App\Providers\AppServiceProvider 的 boot 方法中调用此方法 服务提供者:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});[!WARNING]
二进制数据,例如原始图像内容,在传递给排队作业之前,应该通过base64_encode函数传递。否则,在被放入队列时,作业可能无法正确地序列化为 JSON。
因为当作业入队时,所有加载的 Eloquent 模型关系也会被序列化,序列化的作业字符串有时会变得相当大。此外,当作业反序列化并且模型关系从数据库中重新检索时,它们将以其完整形式被检索。在模型于作业入队过程中序列化之前应用的任何先前关系约束,在作业反序列化时将不会被应用。因此,如果你希望处理给定关系的子集,你应该在你的队列作业中重新约束该关系。
或者,为了防止关联被序列化,你可以在设置属性值时,在模型上调用 withoutRelations 方法。此方法将返回一个不带已加载关联的模型实例:
/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}如果你正在使用 PHP 构造器属性提升 并且希望指明一个 Eloquent 模型不应序列化其关联关系,你可以使用 WithoutRelations 属性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}为方便起见,如果您希望序列化所有不带关系的模型,您可以将 WithoutRelations 属性应用到整个类,而不是将该属性应用到每个模型:
<?php
namespace App\Jobs;
use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;
#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
public DistributionPlatform $platform,
) {}
}如果一个任务接收到一个 Eloquent 模型集合或数组而不是单个模型,那么当任务被反序列化和执行时,该集合中的模型将不会恢复其关联关系。这是为了防止在处理大量模型的任务上出现过度的资源使用。
[!WARNING]
唯一作业需要一个支持锁的缓存驱动。目前,memcached,redis,dynamodb,database,file, 和array缓存驱动支持原子锁。
[!WARNING]
独特的作业约束不适用于批次内的作业。
有时候,您可能希望确保在任何给定时间点,队列中只有一个特定作业的实例。您可以通过在作业类上实现 ShouldBeUnique 接口来做到这一点。此接口不需要您在类上定义任何额外方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
}在上述示例中,UpdateSearchIndex 任务是唯一的。因此,如果该任务的另一个实例已在队列中且尚未完成处理,则该任务将不会被分派。
在某些情况下,您可能希望定义一个特定的“键”来使任务保持唯一,或者您可能希望指定一个超时时间,在此之后任务不再保持唯一。为此,您可以在您的任务类上定义 uniqueId 和 uniqueFor 属性或方法:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Models\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}在上面的示例中,UpdateSearchIndex作业通过产品ID保持唯一。因此,任何使用相同产品ID的新作业分派都将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未处理完毕,唯一锁将被释放,并且可以使用相同的唯一键分派另一个作业到队列。
[!WARNING]
如果您的应用程序从多个 Web 服务器或容器调度任务,您应该确保您的所有服务器都正在与同一个中心缓存服务器通信,以便 Laravel 能够准确地判断一个任务是否唯一。
默认情况下,唯一性作业在完成处理或所有重试尝试失败后才会被“解锁”。但是,在某些情况下,您可能希望作业在处理之前立即解锁。为实现此目的,您的作业应实现 ShouldBeUniqueUntilProcessing 契约,而不是 ShouldBeUnique 契约:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}幕后,当一个 ShouldBeUnique 任务被调度时,Laravel 尝试使用 uniqueId 键获取一个锁。如果锁已被占用,则该任务不会被调度。当任务完成处理或所有重试尝试都失败时,该锁将被释放。默认情况下,Laravel 将使用默认的缓存驱动器来获取此锁。但是,如果您希望使用另一个驱动器来获取锁,您可以定义一个 uniqueVia 方法,该方法返回应该使用的缓存驱动器:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}[!NOTE]
如果你只需要限制作业的并发处理,请转而使用 WithoutOverlapping 作业中间件。
Laravel 允许你通过 加密 来确保作业数据的隐私和完整性。要开始使用,只需添加 ShouldBeEncrypted 接口到作业类。一旦此接口已添加到类中,Laravel 将在将其推入队列之前自动加密你的作业:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}任务中间件允许你将自定义逻辑包裹在队列任务的执行周围,从而减少任务本身的样板代码。例如,考虑以下利用 Laravel 的 Redis 速率限制功能,以允许每五秒只处理一个任务的 handle 方法:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}虽然这段代码是有效的,但 handle 方法的实现变得嘈杂,因为它充满了 Redis 速率限制逻辑。此外,如果我们想对任何其他作业进行速率限制,则必须复制此速率限制逻辑。我们可以不直接在 handle 方法中进行速率限制,而是定义一个处理速率限制的作业中间件:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}如你所见,与 路由中间件 类似,任务中间件接收正在处理的任务以及一个应被调用以继续处理该任务的回调。
您可以使用 make:job-middleware Artisan 命令来生成新的作业中间件类。创建作业中间件后,可以通过从作业的 middleware 方法返回它们来将它们附加到作业。make:job Artisan 命令生成的作业中不存在此方法,因此您需要手动将其添加到您的作业类中:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}尽管我们刚刚演示了如何编写自己的限速任务中间件,Laravel 实际上包含了一个限速中间件,你可以利用它来对任务进行限速。与 路由限速器 类似,任务限速器使用 RateLimiter 门面的 for 方法定义。
例如,您可能希望允许用户每小时备份一次数据,而不对高级客户施加此类限制。为此,您可以在您的 AppServiceProvider 的 boot 方法中定义一个 RateLimiter:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}在上面的示例中,我们定义了一个小时速率限制;然而,您可以使用 perMinute 方法轻松定义基于分钟的速率限制。 此外,您可以将任何您希望的值传递给速率限制的 by 方法;然而,该值最常用于按客户细分速率限制:
return Limit::perMinute(50)->by($job->user->id);定义了速率限制后,你就可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到你的任务上。每当任务超出速率限制时,该中间件会根据速率限制的持续时间,以适当的延迟将任务释放回队列中:
use Illuminate\Queue\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}将限速任务重新放回队列仍会增加该任务的总 attempts 次数。您可能需要相应地调整您任务类上的 tries 和 maxExceptions 属性。或者,您可能希望使用 retryUntil 方法 来定义该任务不再尝试的时间量。
使用 releaseAfter 方法,您还可以指定在尝试重新执行释放作业之前必须经过的秒数:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->releaseAfter(60)];
}如果您不希望某个任务在被限速时重试,您可以使用 dontRelease 方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}[!NOTE]
如果您正在使用 Redis,您可以使用Illuminate\Queue\Middleware\RateLimitedWithRedis中间件,它针对 Redis 进行了优化,并且比基本的速率限制中间件更高效。
Laravel 包含一个允许你基于任意键来防止作业重叠的 Illuminate\Queue\Middleware\WithoutOverlapping 中间件。当一个排队的作业正在修改一个只应在任意给定时间被一个作业修改的资源时这会很有帮助。
例如,假设你有一个队列任务用于更新用户的信用评分,并且你希望防止对于相同的用户ID出现信用评分更新任务重叠。为此,你可以从你的任务的 middleware 方法中返回 WithoutOverlapping 中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}将重叠的作业重新放回队列仍会增加该作业的总尝试次数。您可能希望相应地调整作业类上的 tries 和 maxExceptions 属性。例如,将 tries 属性保留为默认值 1 会阻止任何重叠的作业稍后被重试。
任何同类型的重叠作业都将被释放回队列。您还可以指定在被释放的作业再次尝试之前必须经过的秒数:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}如果您希望立即删除任何重叠的作业,以便它们不会被重试,您可以使用dontRelease方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的任务可能会意外失败或超时,以至于锁未被释放。因此,您可以明确使用 expireAfter 方法定义锁的过期时间。例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping 锁:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}[!WARNING]
该WithoutOverlapping中间件需要一个支持 锁 的缓存驱动程序。目前,memcached、redis、dynamodb、database、file和array缓存驱动程序支持原子锁。
By default, the WithoutOverlapping middleware will only prevent overlapping jobs of the same class. So, although two different job classes may use the same lock key, they will not be prevented from overlapping. However, you can instruct Laravel to apply the key across job classes using the shared method:
WithoutOverlapping 中间件默认只会阻止同一类的任务重叠。因此,尽管两个不同的任务类可能使用了相同的锁定键,它们也不会被阻止重叠。但是,您可以使用 shared 方法指示 Laravel 将该键应用于所有任务类:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许你对异常进行节流。一旦作业抛出给定数量的异常,所有后续执行该作业的尝试都将被延迟,直到指定的时间间隔过去。这个中间件对于与不稳定的第三方服务交互的作业特别有用。
例如,假设一个排队的任务与开始抛出异常的第三方 API 交互. 为了限制异常,你可以从任务的 middleware 方法中返回 ThrottlesExceptions 中间件. 通常情况下,此中间件应与实现 基于时间的重试 的任务搭配使用:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}中间件接受的第一个构造函数参数是任务在被限流之前可以抛出的异常数量,而第二个构造函数参数是任务被限流后,在再次尝试之前应该经过的秒数。在上面的代码示例中,如果任务抛出 10 个连续异常,我们将在等待 5 分钟后再次尝试该任务,并受 30 分钟时间限制的约束。
当作业抛出异常但异常阈值尚未达到时,该作业通常会立即重试。但是,在将中间件附加到作业时,您可以通过调用 backoff 方法来指定此类作业应延迟的分钟数:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且任务的类名被用作缓存的“键”。你可以在将中间件附加到你的任务时通过调用 by 方法来覆盖此键。这在你有多个任务与同一个第三方服务交互并且希望它们共享一个通用的节流“桶”以确保它们遵守一个单一的共享限制时可能很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}默认情况下,此中间件将会对每个异常进行限流。您可以通过在将中间件附加到您的作业时调用 when 方法来修改此行为。那么,只有当提供给 when 方法的闭包返回 true 时,该异常才会被限流:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}与 when 方法不同,该方法会将作业重新放回队列或抛出异常,deleteWhen 方法允许你在发生给定异常时彻底删除作业:
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}如果您希望将受限制的异常报告给您的应用程序的异常处理器,您可以通过在将中间件附加到您的作业时调用 report 方法来做到这一点。可选地,您可以向 report 方法提供一个闭包,并且只有当给定的闭包返回 true 时,异常才会被报告:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}[!NOTE]
如果你正在使用 Redis,你可以使用Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis中间件,该中间件针对 Redis 进行了优化,并且比基本的异常节流中间件更高效。
Skip中间件允许你指定一个作业应该被跳过/删除,而无需修改作业的逻辑。Skip::when方法将在给定条件评估为true时删除该作业,而Skip::unless方法将在条件评估为false时删除该作业:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($condition),
];
}您也可以传递一个 Closure 给 when 和 unless 方法,以进行更复杂的条件评估:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}编写好 job 类后,你可以使用 job 本身的 dispatch 方法来分发它。传递给 dispatch 方法的参数将被传递给 job 的构造函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}如果您想有条件地调度一个任务,您可以使用 dispatchIf 和 dispatchUnless 方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);在新的 Laravel 应用中,database 连接被定义为默认队列。你可以通过更改你应用的 .env 文件中的 QUEUE_CONNECTION 环境变量来指定一个不同的默认队列连接。
如果您希望指定某个任务不应立即被队列工作器处理,您可以在分发任务时使用 delay 方法。例如,我们可以指定一个任务在分发后10分钟才能被处理:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}在某些情况下,任务可能配置了默认延迟。如果您需要绕过此延迟并立即处理任务,您可以使用 withoutDelay 方法:
ProcessPodcast::dispatch($podcast)->withoutDelay();[!警告]
亚马逊 SQS 队列服务的最大延迟时间为 15 分钟。
如果您想立即(同步地)调度一个任务,您可以使用 dispatchSync 方法。当使用此方法时,该任务将不会被排入队列,并会在当前进程中立即执行:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}使用延迟同步分派,你可以分派一个作业在当前进程中处理,但在 HTTP 响应已发送给用户之后. 这使你能够同步处理 "已排队的" 作业,而不会降低你用户的应用程序体验. 要延迟同步作业的执行,请将作业分派到 deferred 连接:
RecordDelivery::dispatch($order)->onConnection('deferred');deferred 连接也用作默认的故障转移队列。
同样地,background 连接在 HTTP 响应已发送给用户之后处理任务;然而,该任务在一个单独生成的 PHP 进程中处理,使得 PHP-FPM / 应用程序工作进程可以处理另一个传入的 HTTP 请求:
RecordDelivery::dispatch($order)->onConnection('background');虽然在数据库事务中调度任务完全没问题,但您应该特别注意确保您的任务能够真正成功执行。当在事务中调度任务时,该任务有可能在父级事务提交之前就被工作进程处理了。当这种情况发生时,您在数据库事务期间对模型或数据库记录进行的任何更新可能尚未反映到数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。
值得庆幸的是,Laravel 提供了几种解决此问题的方法。首先,你可以在你的队列连接的配置数组中设置 after_commit 连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],当 after_commit 选项为 true 时,你可以在数据库事务中分发作业;但 Laravel 会等到开放的父级数据库事务提交后,才会真正分发作业。当然,如果当前没有开放的数据库事务,作业将立即分发。
如果事务因在事务执行期间发生的异常而回滚,则在该事务期间调度的作业将被丢弃。
[!注意]
将after_commit配置选项设置为true也将导致所有排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被分发。
如果您没有将 after_commit 队列连接配置选项设置为 true,您仍然可以指示某个特定任务应该在所有开放数据库事务提交后被分派。为此,您可以将 afterCommit 方法链接到您的分派操作上:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();同样地, 如果 after_commit 配置选项设置为 true, 您可以指示某个特定任务应立即分派而无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();作业链允许您指定一系列排队的作业,这些作业应在主作业成功执行后按顺序运行。如果序列中的某个作业失败,其余作业将不会运行。要执行排队的作业链,您可以使用由 Bus Facade 提供的 chain 方法。Laravel 的命令总线是一个底层组件,排队作业的分发功能就是基于它构建的:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();除了链式调用作业类实例,你也可以链式调用闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();[!警告]
在任务内部使用$this->delete()方法删除任务,不会阻止链式任务被处理。只有当链中的某个任务失败时,链才会停止执行。
如果您想指定链式作业应使用的连接和队列,您可以使用 onConnection 和 onQueue 方法。这些方法指定了应使用的队列连接和队列名称,除非队列作业被明确分配了不同的连接/队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();有时,你可能需要从作业链中的另一个作业内部,向现有作业链预置或追加一个作业。你可以通过使用 prependToChain 和 appendToChain 方法来完成此操作:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}当链式调用作业时,你可以使用 catch 方法来指定一个闭包,如果链中的某个作业失败,该闭包应被调用。给定的回调将接收导致作业失败的 Throwable 实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();[!警告]
由于链式回调由 Laravel 队列在稍后时间序列化并执行,因此您不应在链式回调中使用$this变量。
通过将任务推送到不同队列, 您可以 "分类" 您的排队任务 甚至优先处理您分配给各种队列的工作器数量. 请记住, 这并不会将任务推送到不同的队列 "连接" 如由您的队列配置文件定义, 而只是推送到单个连接中的特定队列. 要指定队列, 使用 onQueue 方法 在调度任务时:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}或者,你可以在任务的构造函数中调用 onQueue 方法来指定任务的队列:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}如果您的应用程序与多个队列连接进行交互,您可以使用onConnection方法指定要将作业推送到哪个连接:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}你可以链式调用 onConnection 和 onQueue 方法,以指定作业的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');另一种方法是,您可以在任务的构造函数中调用 onConnection 方法来指定任务的连接:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}作业尝试是 Laravel 队列系统的核心概念,并为许多高级功能提供支持。尽管它们初看起来可能令人困惑,但在修改默认配置之前,了解它们的工作原理至关重要。
当一个任务被分派时,它被推入队列。一个工作进程接着拾取它并尝试执行它。这即是一次任务尝试。
然而,一次尝试并不一定意味着该任务的 handle 方法已被执行。尝试也可能以多种方式被“消耗”:
你可能不希望无限期地尝试某项作业。因此,Laravel 提供了多种方式来指定作业可以尝试的次数或时长。
[!NOTE]
默认情况下,Laravel 只会尝试一次任务。如果你的任务使用了中间件,例如WithoutOverlapping或RateLimited,或者你正在手动释放任务,你可能需要通过tries选项增加允许的尝试次数。
指定作业最大尝试次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这适用于工作进程处理的所有作业,除非正在处理的作业指定了它可能被尝试的次数:
php artisan queue:work --tries=3如果一个任务超过了其最大尝试次数,它将被视为一个“失败”任务。有关处理失败任务的更多信息,请查阅失败任务文档。如果向 queue:work 命令提供 --tries=0,该任务将无限期地重试。
您可以采取更精细的方法,通过在作业类本身上定义作业的最大尝试次数。如果作业上指定了最大尝试次数,它将优先于在命令行中提供的 --tries 值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}如果你需要动态控制某个特定作业的最大尝试次数,你可以在该作业上定义一个 tries 方法:
/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}作为定义作业在失败前可尝试次数的替代方案,你可以定义一个作业不应再被尝试的时间点。这允许作业在给定时间范围内被尝试任意次数。要定义作业不应再被尝试的时间点,请将 retryUntil 方法添加到你的作业类中。此方法应返回一个 DateTime 实例:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}如果同时定义了 retryUntil 和 tries,Laravel 会优先使用 retryUntil 方法。
[!NOTE]
你还可以定义一个tries属性或retryUntil方法在你的 排队事件监听器 和 排队通知 中。
有时你可能希望指定一个任务可以被尝试多次,但如果重试是由给定数量的未处理异常触发的,则应失败(而不是通过 release 方法直接释放)。为此,你可以在任务类上定义一个 maxExceptions 属性:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}在此示例中,如果应用程序无法获取 Redis 锁,则该作业将释放十秒,并会继续重试多达 25 次。但是,如果作业抛出三个未处理的异常,则该作业将失败。
通常,您大致知道队列中的任务预计需要多长时间。因此,Laravel 允许您指定一个“timeout”值。默认情况下,timeout 值为 60 秒。如果一个任务处理时间超过 timeout 值所指定的秒数,处理该任务的 worker 将会以错误退出。通常,worker 将由 在您的服务器上配置的进程管理器 自动重启。
作业可运行的最大秒数,可以使用 Artisan 命令行上的 --timeout 开关来指定:
php artisan queue:work --timeout=30如果作业因持续超时而超出其最大尝试次数,它将被标记为失败。
您还可以在作业类本身上定义作业允许运行的最大秒数。如果在作业上指定了超时,它将优先于在命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}有时,IO阻塞进程例如套接字或出站HTTP连接可能不遵守您指定的超时时间。因此,当使用这些特性时,您应始终尝试同样也使用其API指定超时时间。例如,当使用Guzzle时,您应始终指定连接和请求的超时值。
[!WARNING]
要指定作业超时,必须安装 PCNTL PHP 扩展。此外,作业的 "timeout" 值应始终小于其 "retry after" 值。否则,作业可能会在其实际执行完成或超时之前被重新尝试。
如果你想指明一个任务在超时时应被标记为失败,你可以在任务类上定义$failOnTimeout属性:
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;[!注意]
默认情况下,当作业超时时,它会消耗一次尝试并被释放回队列(如果允许重试)。但是,如果您将作业配置为超时失败,它将不会被重试,无论为重试次数设置的值是多少。
Laravel 支持 Amazon SQS FIFO (先进先出) 队列,从而使您能够按照它们被发送的精确顺序处理作业同时通过消息去重来确保恰好一次处理。
FIFO 队列需要一个消息组 ID 来确定哪些作业可以并行处理。具有相同组 ID 的作业按顺序处理,而具有不同组 ID 的消息可以并发处理。
Laravel 提供了一个流畅的 onGroup 方法,用于在分发任务时指定消息组ID:
ProcessOrder::dispatch($order)
->onGroup("customer-{$order->customer_id}");SQS FIFO 队列支持消息去重,以确保精确一次处理。在您的作业类中实现一个 deduplicationId 方法,以提供一个自定义的去重 ID:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessSubscriptionRenewal implements ShouldQueue
{
use Queueable;
// ...
/**
* Get the job's deduplication ID.
*/
public function deduplicationId(): string
{
return "renewal-{$this->subscription->id}";
}
}当使用 FIFO 队列时,您还需要在监听器、邮件和通知上定义消息组。或者,您可以将这些对象的排队实例分派到非 FIFO 队列。
为了定义 队列事件监听器 的消息组,请在监听器上定义一个 messageGroup 方法。你也可以选择定义一个 deduplicationId 方法:
<?php
namespace App\Listeners;
class SendShipmentNotification
{
// ...
/**
* Get the job's message group.
*/
public function messageGroup(): string
{
return 'shipments';
}
/**
* Get the job's deduplication ID.
*/
public function deduplicationId(): string
{
return "shipment-notification-{$this->shipment->id}";
}
}当发送一个将要进入 FIFO 队列的 邮件消息 时,你应该在发送通知时调用 onGroup 方法,并可选地调用 withDeduplicator 方法:
use App\Mail\InvoicePaid;
use Illuminate\Support\Facades\Mail;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
Mail::to($request->user())->send($invoicePaid);当发送一个将要排队到先进先出(FIFO)队列的通知时,您应该在发送通知时调用 onGroup 方法,并可选地调用 withDeduplicator 方法:
use App\Notifications\InvoicePaid;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
$user->notify($invoicePaid);failover 队列驱动在将作业推送到队列时提供自动故障转移功能。如果主队列连接因任何原因失败,Laravel 将自动尝试将作业推送到列表中的下一个配置连接。这对于在队列可靠性至关重要的生产环境中确保高可用性特别有用。
要配置故障转移队列连接,请指定 failover 驱动,并提供一个按顺序尝试的连接名称数组。默认情况下,Laravel 在应用程序的 config/queue.php 配置文件中包含一个示例故障转移配置:
'failover' => [
'driver' => 'failover',
'connections' => [
'redis',
'database',
'sync',
],
],一旦你配置了一个使用 failover 驱动的连接,你可能需要将该 failover 连接设置为你应用的 .env 文件中的默认队列连接:
QUEUE_CONNECTION=failover接下来,为您的故障转移连接列表中的每个连接启动至少一个工作器:
php artisan queue:work redis
php artisan queue:work database[!注意]
您无需运行 worker 来处理使用sync、background或deferred队列驱动程序的连接,因为这些驱动程序在当前 PHP 进程中处理作业。
当队列连接操作失败并激活故障转移时,Laravel 将会分派 Illuminate\Queue\Events\QueueFailedOver 事件,允许你报告或记录队列连接已失败。
[!TIP]
如果您使用 Laravel Horizon,请记住 Horizon 只管理 Redis 队列。如果您的故障转移列表包含database,您应该运行一个常规的php artisan queue:work database进程与 Horizon 并行。
如果在处理任务时抛出异常,任务将自动释放回队列,以便可以再次尝试。任务将继续被释放,直到其尝试次数达到您的应用程序所允许的最大值。最大尝试次数由在 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作进程的更多信息,可以在下方找到。
有时您可能希望手动将一个作业重新放回队列,以便稍后再次尝试。您可以通过调用 release 方法来完成此操作:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->release();
}By default, the release method will release the job back onto the queue for immediate processing. However, you may instruct the queue to not make the job available for processing until a given number of seconds has elapsed by passing an integer or date instance to the release method:
$this->release(10);
$this->release(now()->addSeconds(10));偶尔您可能需要手动将作业标记为“失败”。为此,您可以调用 fail 方法:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->fail();
}如果你希望因为捕获到了一个异常而将任务标记为失败,你可以将该异常传递给 fail 方法。或者,为了方便,你可以传递一个字符串错误消息,该消息将为你转换为一个异常:
$this->fail($exception);
$this->fail('Something went wrong.');[!NOTE]
有关失败作业的更多信息,请查阅处理作业失败的文档。
FailOnException 作业中间件 允许你在抛出特定异常时中断重试。这使得针对瞬时异常(例如外部 API 错误)可以重试,但对于持久性异常(例如用户的权限被撤销),作业将永久失败:
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;
class SyncChatHistory implements ShouldQueue
{
use Queueable;
public $tries = 3;
/**
* Create a new job instance.
*/
public function __construct(
public User $user,
) {}
/**
* Execute the job.
*/
public function handle(): void
{
$this->user->authorize('sync-chat-history');
$response = Http::throw()->get(
"https://chat.laravel.test/?user={$this->user->uuid}"
);
// ...
}
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
new FailOnException([AuthorizationException::class])
];
}
}Laravel 的作业批处理功能让你可以轻松执行一批作业,然后在作业批处理完成后执行一些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含关于你的作业批处理的元信息,例如它们的完成百分比。这个迁移可以使用 make:queue-batches-table Artisan 命令生成:
php artisan make:queue-batches-table
php artisan migrate要定义一个可批处理作业,你应该像往常一样创建一个可队列作业;但是,你应该将Illuminate\Bus\Batchable trait添加到作业类。这个trait提供了访问一个batch 方法,该方法可用于检索作业当前正在执行的批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// Import a portion of the CSV file...
}
}要分派一批作业,您应该使用 Bus 外观的 batch 方法。当然,批处理主要在与完成回调结合使用时才有用。因此,您可以利用 then 、 catch 和 finally 方法来为批处理定义完成回调。当它们被调用时,每个回调都将收到一个 Illuminate\Bus\Batch 实例。
当运行多个队列工作器时,批次中的作业将并行处理。因此,作业完成的顺序可能与它们添加到批次的顺序不同。请查阅我们关于作业链和批次的文档,了解如何按顺序运行一系列作业。
在此示例中,我们假设正在排队处理一批作业,每个作业都从一个 CSV 文件处理给定数量的行:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
return $batch->id;批处理的 ID,可通过 $batch->id 属性访问,可用于 查询 Laravel 命令总线 以获取有关该批处理的信息,在它被分发之后。
[!WARNING]
由于批量回调会被 Laravel 队列序列化并在稍后执行,因此你不应该在回调中使用 $this 变量。此外,由于批量作业被包裹在数据库事务中,因此不应该在作业中执行会触发隐式提交的数据库语句。
某些工具例如 Laravel Horizon 和 Laravel Telescope 可能会在批处理被命名时为批处理提供更用户友好的调试信息。要为批处理分配一个任意名称,您可以在定义批处理时调用 name 方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();如果您想指定用于批处理作业的连接和队列,您可以使用 onConnection 和 onQueue 方法。所有批处理作业必须在相同的连接和队列中执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();您可以在批处理中通过将链式作业放置在一个数组中定义一组链式作业。例如,我们可以并行执行两个作业链并在两个作业链完成处理时执行回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->dispatch();反之,您可以通过在链中定义批次,在一个 链 中运行多批作业。例如,您可以首先运行一批作业来发布多个播客,然后运行一批作业来发送发布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();有时,从一个已批处理的作业中向批处理添加更多作业可能会很有用。 当您需要批处理数千个作业,而这些作业在 Web 请求期间分派可能需要太长时间时,此模式会很有用。 因此,您可能希望分派一批初始的“加载器”作业,这些作业会用更多的作业来填充该批处理:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();在本例中,我们将使用 LoadImportBatch 作业来用额外的作业填充批处理。为此,我们可以使用批处理实例上的 add 方法,该实例可通过作业的 batch 方法访问:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}[!WARNING]
你只能在属于同一批次的作业内部向该批次添加作业。
被提供给批处理完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,以帮助你交互和检查给定的作业批次:
// The UUID of the batch...
$batch->id;
// The name of the batch (if applicable)...
$batch->name;
// The number of jobs assigned to the batch...
$batch->totalJobs;
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
// The number of jobs that have failed...
$batch->failedJobs;
// The number of jobs that have been processed thus far...
$batch->processedJobs();
// The completion percentage of the batch (0-100)...
$batch->progress();
// Indicates if the batch has finished executing...
$batch->finished();
// Cancel the execution of the batch...
$batch->cancel();
// Indicates if the batch has been cancelled...
$batch->cancelled();所有 Illuminate\Bus\Batch 实例都是可 JSON 序列化的,这意味着你可以直接从应用程序的路由中返回它们,以检索包含批次信息(包括其完成进度)的 JSON 有效负载。这使得在应用程序的 UI 中显示批次完成进度信息变得方便。
要通过 ID 检索批处理,您可以使用 Bus facade 的 findBatch 方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});有时你可能需要取消给定批处理的执行。这可以通过在 Illuminate\Bus\Batch 实例上调用 cancel 方法来实现:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
$this->batch()->cancel();
return;
}
if ($this->batch()->cancelled()) {
return;
}
}正如您可能在之前的示例中注意到的那样,批处理作业通常应判断其对应的批次是否已被取消,然后再继续执行。然而,为了方便起见,您可以改为将 SkipIfBatchCancelled 中间件 分配给该作业。顾名思义,此中间件将指示 Laravel 在其对应的批次已被取消时,不处理该作业:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}当批处理作业失败时,catch 回调(如果已分配)将被调用。此回调仅在批处理中第一个失败的作业时被调用。
当批处理中的某个作业失败时,Laravel 会自动将该批处理标记为“已取消”。如果你愿意,你可以禁用此行为,以防止作业失败时自动将批处理标记为已取消。这可以通过在调度批处理时调用 allowFailures 方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();您可以选择性地提供一个闭包给 allowFailures 方法,该闭包将在每个作业失败时执行:
$batch = Bus::batch([
// ...
])->allowFailures(function (Batch $batch, $exception) {
// Handle individual job failures...
})->dispatch();为方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,该命令允许你轻松重试给定批次中所有失败的任务。此命令接受应重试其失败任务的批次的 UUID:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5如果没有清理,该 job_batches 表会非常迅速地积累记录。为了缓解这个问题,您应该 调度 该 queue:prune-batches Artisan 命令每天运行:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();默认情况下,所有已完成且已超过 24 小时的批次都将被修剪。 调用命令时,您可以使用 hours 选项来确定批处理数据的保留时长。 例如,以下命令将删除所有已在 48 小时前完成的批次:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();有时,您的 jobs_batches 表可能会累积从未成功完成的批次记录,例如作业失败但从未成功重试的批次。 您可以指示 queue:prune-batches 命令使用 unfinished 选项来修剪这些未完成的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();同样地,您的 jobs_batches 表也可能积累针对已取消批次的批次记录。您可以指示 queue:prune-batches 命令使用 cancelled 选项来剪除这些已取消的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();Laravel 也支持将批处理元信息存储到 DynamoDB 而不是关系型数据库。不过, 你需要手动创建一个 DynamoDB 表来存储所有批处理记录。
通常,此表应命名为job_batches,但您应根据您的应用程序的queue配置文件中queue.batching.table配置值来命名该表。
job_batches 表应具有名为 application 的字符串主分区键和名为 id 的字符串主排序键. 键的 application 部分将包含您的应用程序名称,该名称由您的应用程序的 app 配置文件中的 name 配置值定义. 由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的作业批次.
此外,您可以为您的表定义 ttl 属性如果您希望利用 自动批量修剪。
接下来,安装 AWS SDK,以便您的 Laravel 应用程序能够与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php接下来,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应该在 batching 配置数组中定义 key、secret 和 region 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动程序时,queue.batching.database 配置选项是不需要的:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],当利用 DynamoDB 存储作业批次信息时,用于清理存储在关系数据库中的批次的典型剪枝命令将不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能 自动删除旧批次的记录。
如果你使用 ttl 属性定义了 DynamoDB 表,你可以定义配置参数来指示 Laravel 如何清理批量记录。queue.batching.ttl_attribute 配置值定义了持有 TTL 的属性名称,而 queue.batching.ttl 配置值定义了批量记录可以从 DynamoDB 表中移除的秒数,该秒数是相对于记录上次更新的时间而言的:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],除了将一个任务类分发到队列之外,你也可以分发一个闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分发到队列时,闭包的代码内容会进行加密签名,以便在传输过程中不会被修改:
use App\Models\Podcast;
$podcast = Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});为了给队列闭包分配一个名称,该名称可用于队列报告仪表盘,并由 queue:work 命令显示,你可以使用 name 方法:
dispatch(function () {
// ...
})->name('Publish Podcast');使用 catch 方法,你可以提供一个如果排队的闭包在耗尽了队列的所有 配置的重试尝试次数后未能成功完成时应执行的闭包:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});[!WARNING]
由于catch回调会被 Laravel 队列序列化并在稍后执行,因此你不应该在catch回调中使用$this变量。
queue:work 命令Laravel 包含一个 Artisan 命令,它会启动一个队列工作器,并在新任务被推送到队列时处理它们。你可以使用 queue:work Artisan 命令来运行工作器。请注意,一旦 queue:work 命令启动,它将持续运行,直到它被手动停止或你关闭终端:
php artisan queue:work[!NOTE]
为了让queue:work进程在后台永久运行,你应该使用进程管理器,例如 Supervisor 以确保队列工作器不会停止运行。
您可以在调用 queue:work 命令时包含 -v 标志,如果您希望将已处理的作业 ID、连接名称和队列名称包含在命令的输出中:
php artisan queue:work -v请记住,队列工作器是长期运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后将不会注意到代码库中的更改。所以,在部署过程中,请务必重启您的队列工作器。此外,请记住,由您的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。
此外,你可以运行 queue:listen 命令。当使用 queue:listen 命令时,当你想重新加载更新的代码或重置应用程序状态时,你无需手动重启 worker;然而,这个命令的效率显著低于 queue:work 命令:
php artisan queue:listen将多个工作进程分配给队列并并发处理任务,您只需启动多个 queue:work 进程即可。这可以在本地通过终端的多个标签页完成,或者在生产环境中利用进程管理器的配置设置来完成。 当使用 Supervisor,您可以使用 numprocs 配置值。
您还可以指定工作器应使用哪个队列连接。传递给 work 命令的连接名称应与 config/queue.php 配置文件中定义的连接之一相对应:
php artisan queue:work redisBy default, the queue:work command only processes jobs for the default queue on a given connection. However, you may customize your queue worker even further by only processing particular queues for a given connection. For example, if all of your emails are processed in an emails queue on your redis queue connection, you may issue the following command to start a worker that only processes that queue:
php artisan queue:work redis --queue=emails该 --once 选项可用于指示工作进程仅处理队列中的一个作业:
php artisan queue:work --once--max-jobs 选项可用于指示 worker 处理给定数量的作业,然后退出。此选项在与 [Supervisor](#supervisor-configuration) 结合使用时可能会很有用,以便你的 worker 在处理完给定数量的作业后自动重启,从而释放它们可能已累积的任何内存:
php artisan queue:work --max-jobs=1000该 --stop-when-empty 选项可用于指示 worker 处理完所有任务后优雅地退出。当您希望在队列为空后关闭容器时,此选项在处理 Docker 容器内的 Laravel 队列时会很有用:
php artisan queue:work --stop-when-empty--max-time 选项可用于指示 worker 处理任务持续指定秒数,然后退出。此选项与 Supervisor 结合使用时可能很有用,以便你的 worker 在处理任务持续一段时间后自动重新启动,释放它们可能积累的任何内存:
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600当队列中有可用任务时,工作进程将持续处理任务,任务之间没有延迟。然而,sleep 选项决定了如果没有可用任务,工作进程将“休眠”多少秒。当然,在休眠期间,工作进程不会处理任何新任务:
php artisan queue:work --sleep=3当您的应用程序处于维护模式时,任何排队的任务都将不会被处理。一旦应用程序退出维护模式,这些任务将继续照常被处理。
要强制你的队列工作程序处理任务,即使在维护模式下,你也可以使用 --force 选项:
php artisan queue:work --force守护进程队列工作器在处理每个任务之前不会“重启”框架。因此,您应该在每个任务完成后释放任何大量资源。例如,如果您正在使用 GD 库 进行图像处理,您应该在处理完图像后使用 imagedestroy 释放内存。
有时您可能希望优先处理队列。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,偶尔您可能希望将作业推送到 high 优先级的队列,如下所示:
dispatch((new Job)->onQueue('high'));要启动一个工作进程,该进程验证所有 high 队列中的任务都已处理完毕,然后才继续处理 low 队列, 请将一个逗号分隔的队列名称列表传递给 work 命令:
php artisan queue:work --queue=high,low由于队列工作器是长期运行的进程,它们在不重启的情况下不会注意到你代码的更改。因此,使用队列工作器部署应用程序最简单的方法是在部署过程中重启工作器。你可以通过执行 queue:restart 命令来优雅地重启所有工作器:
php artisan queue:restart此命令将指示所有队列工作器在完成处理当前任务后优雅地退出,以避免丢失任何现有任务。由于队列工作器将在 queue:restart 命令执行时退出,您应该运行一个进程管理器,例如 Supervisor,以自动重新启动队列工作器。
[!注意]
队列使用 缓存 来存储重启信号,因此你应该验证一个缓存驱动程序已为你的应用程序正确配置,在使用此功能前。
好的,这是一个专业的技术文档翻译任务。我会严格按照您的要求进行翻译。
[!WARNING]
唯一不包含retry_after值的队列连接是 Amazon SQS。SQS 将根据 默认可见性超时 重试作业,该超时在 AWS 控制台中进行管理。
queue:work Artisan 命令暴露了一个 --timeout 选项。默认情况下,--timeout 的值是 60 秒。如果一个任务处理时间超过了超时值指定的秒数,处理该任务的工作器将退出并报错。通常情况下,该工作器将由在你的服务器上配置的进程管理器自动重启:
php artisan queue:work --timeout=60retry_after 配置选项和 --timeout CLI 选项是不同的,但它们协同工作,以确保作业不会丢失,并且作业只会被成功处理一次。
[!WARNING]
--timeout值应始终比你的retry_after配置值至少短几秒钟。这将确保处理冻结作业的工作进程始终在该作业被重试之前终止。如果你的--timeout选项长于你的retry_after配置值,你的作业可能会被处理两次。
在生产环境中,你需要一种方法来保持你的 queue:work 进程运行。一个 queue:work 进程可能会因为各种原因停止运行,例如工作进程超时或 queue:restart 命令的执行。
因此,你需要配置一个进程监控器,它能够检测到你的 queue:work 进程何时退出并自动重新启动它们。此外,进程监控器可以让你指定希望同时运行多少个 queue:work 进程。Supervisor 是一种在 Linux 环境中常用的进程监控器,我们将在接下来的文档中讨论如何配置它。
Supervisor 是 Linux 操作系统的进程监视器,并会在您的 queue:work 进程失败时自动重启它们。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:
sudo apt-get install supervisor[!注意]
如果自行配置和管理 Supervisor 听起来令人不知所措,请考虑使用 Laravel Cloud,它提供了一个完全托管的平台,用于运行 Laravel 队列工作器。
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录下。在此目录下,您可以创建任意数量的配置文件,以指示 Supervisor 如何监控您的进程。例如,我们来创建一个 laravel-worker.conf 文件来启动和监控 queue:work 进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监控所有这些,如果它们失败,则自动重启它们。您应该更改配置中的 command 指令以反映您所需的队列连接和工作进程选项。
[!WARNING]
您应确保的值stopwaitsecs` 大于您运行时间最长的作业所消耗的秒数。否则,Supervisor 可能会在该作业处理完成之前将其终止。
一旦配置文件创建完成,你可以更新 Supervisor 配置并使用以下命令启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"有关 Supervisor 的更多信息,请参阅 Supervisor 文档。
有时你的队列任务会失败。别担心,事情并非总能如愿以偿!Laravel 提供了一种方便的方式来 指定任务应尝试的最大次数。当异步任务超出此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步分派的任务 失败后不会存储在此表中,它们的异常会立即由应用程序处理。
用于创建 failed_jobs 表的迁移通常已在新的 Laravel 应用程序中存在。但是,如果你的应用程序不包含此表的迁移,你可以使用 make:queue-failed-table 命令来创建迁移:
php artisan make:queue-failed-table
php artisan migrate当运行一个队列工作器进程时,你可以使用 ---tries 开关在 queue:work 命令上指定一个任务应被尝试的最大次数。如果你没有为 ---tries 选项指定值,任务将只尝试一次,或者按照任务类的 $tries 属性的指定次数进行尝试:
php artisan queue:work redis --tries=3使用 --backoff 选项,你可以指定 Laravel 在重试遇到异常的任务之前应该等待多少秒。默认情况下,任务会立即被释放回队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3如果你想配置 Laravel 在重试一个遇到异常的任务之前,基于每个任务,应该等待多少秒,你可以通过在你的任务类上定义一个 backoff 属性来实现:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;如果您需要更复杂的逻辑来确定任务的退避时间,您可以在您的任务类上定义一个 backoff 方法:
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}您可以通过从 backoff 方法返回一个退避值数组来轻松配置“指数式”退避。在此示例中,重试延迟将是第一次重试为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒并且每次后续重试都为 10 秒如果仍有更多尝试次数:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}当特定任务失败时,您可能希望向用户发送警报,或者回滚任务部分完成的任何操作。为了实现此目的,您可以在任务类上定义一个 failed 方法。导致任务失败的 Throwable 实例将被传递给 failed 方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}[!WARNING]
在调用failed方法之前会实例化该任务的一个新实例;因此,在handle方法中可能发生的任何类属性修改都将会丢失。
失败的作业不一定遇到了未处理的异常。当作业耗尽了其所有允许的尝试次数时,它也可能被视为失败。这些尝试次数可以通过以下几种方式消耗:
如果最后一次尝试因作业执行期间抛出的异常而失败,该异常将传递给作业的 failed 方法。但是,如果作业因已达到最大允许尝试次数而失败,则 $exception 将是一个实例 的 Illuminate\Queue\MaxAttemptsExceededException。同样,如果作业因超出配置的超时时间而失败,则 $exception 将是一个实例 的 Illuminate\Queue\TimeoutExceededException。
要查看已插入到您的 failed_jobs 数据库表中的所有失败作业,您可以使用 queue:failed Artisan 命令:
php artisan queue:failedqueue:failed 命令将列出作业 ID, 连接, 队列, 失败时间, 以及关于作业的其他信息。作业 ID 可用于重试失败的作业。例如, 要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业, 请发出以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece如有必要,您可以传入多个 ID 给该命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d您也可以重试针对特定队列的所有失败作业:
php artisan queue:retry --queue=name要重试所有失败的作业,执行 queue:retry 命令并将 all 作为 ID 传递:
php artisan queue:retry all如果您想删除一个失败的任务,您可以使用 queue:forget 命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d[!NOTE]
使用 Horizon 时,你应该使用horizon:forget命令来删除失败的任务,而不是queue:forget命令。
若要从你的 failed_jobs 表中删除所有失败任务,你可以使用 queue:flush 命令:
php artisan queue:flush该 queue:flush 命令会移除队列中所有失败的任务记录,无论这些失败的任务已存在多久。你可以使用 --hours 选项,只删除在特定小时数之前或更早失败的任务:
php artisan queue:flush --hours=48当将 Eloquent 模型注入到任务中时,模型会在被放入队列之前自动序列化,并在任务处理时从数据库中重新检索。然而,如果当任务正在等待工作程序处理时该模型已被删除,你的任务可能会因 ModelNotFoundException 错误而失败。
为了方便,您可以选择通过将作业的 deleteWhenMissingModels 属性设置为 true 来自动删除缺少模型的作业。当此属性设置为 true 时,Laravel 将悄无声息地丢弃该作业,而不会引发异常:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;您可以调用 queue:prune-failed Artisan 命令修剪应用程序的 failed_jobs 表中的记录:
php artisan queue:prune-failed默认情况下,所有超过 24 小时的失败作业记录将被清理。如果您为命令提供 --hours 选项,则只有在过去 N 小时内插入的失败作业记录将被保留。例如,以下命令将删除所有在 48 小时前插入的失败作业记录:
php artisan queue:prune-failed --hours=48Laravel 还支持将失败的作业记录存储在 DynamoDB 而不是关系型数据库表中。然而,你必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,该表的名称应为 failed_jobs,但你应该根据 queue.failed.table 配置值来命名该表,该配置值位于你的应用的 queue 配置文件中。
该 failed_jobs 表应具有一个名为 application 的字符串主分区键以及一个名为 uuid 的字符串主排序键。键的 application 部分将包含你的应用程序的名称,该名称由应用程序的 app 配置文件中定义的 name 配置值确定。由于应用程序名称是 DynamoDB 表键的一部分,你可以使用同一个表来存储多个 Laravel 应用程序的失败作业。
此外,确保您安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,你还应该在失败作业配置数组中定义 key、secret 和 region 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动程序时,queue.failed.database 配置选项是不必要的:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],您可以通过将 queue.failed.driver 配置选项的值设置为 null,来指示 Laravel 丢弃失败的作业而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量来实现:
QUEUE_FAILED_DRIVER=null如果您想注册一个在作业失败时被调用的事件监听器,您可以使用 Queue 门面的 failing 方法。例如,我们可以从随 Laravel 提供的 AppServiceProvider 的 boot 方法中,将一个闭包附加到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}[!注意]
使用 Horizon 时,应使用horizon:clear命令来清除队列中的任务,而不是queue:clear命令。
如果您想从默认连接的默认队列中删除所有作业,您可以使用 queue:clear Artisan 命令来执行此操作:
php artisan queue:clear您还可以提供 connection 参数和 queue 选项,以从特定的连接和队列中删除任务:
php artisan queue:clear redis --queue=emails[!WARNING]
从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程需要长达 60 秒,因此在您清除队列后 60 秒内发送到 SQS 队列的任务也可能会被删除。
如果你的队列突然涌入大量任务,它可能会不堪重负,导致任务完成时间过长。如果你愿意,Laravel 可以在你的队列任务数量超过指定阈值时提醒你。
要开始,你应该调度 queue:monitor 命令来 每分钟运行一次。该命令接受你希望监控的队列的名称以及你期望的作业计数阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100单独调度此命令不足以触发通知,提醒你队列已过载的状态。当此命令遇到一个作业计数超过你阈值的队列时,将会分发一个 Illuminate\Queue\Events\QueueBusy 事件。你可以在应用的 AppServiceProvider 中监听此事件,以便向你或你的开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}当测试分发任务的代码时,你可能希望指示 Laravel 不实际执行任务本身,因为任务的代码可以独立于分发它的代码直接进行测试。当然,为了测试任务本身,你可以在测试中直接实例化一个任务实例并调用 handle 方法。
`
<?php
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}你可以向 assertPushed、assertNotPushed、assertClosurePushed 或 assertClosureNotPushed 方法传递一个闭包,以断言已推送的作业通过了给定的“真值测试”。如果至少有一个已推送的作业通过了给定的真值测试,那么该断言将成功:
use Illuminate\Queue\CallQueuedClosure;
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Queue::assertClosurePushed(function (CallQueuedClosure $job) {
return $job->name === 'validate-order';
});如果你只需要伪造特定的作业,同时允许其他作业正常执行,你可以将需要伪造的作业的类名传递给 fake 方法:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}您可以模拟除一组指定作业之外的所有作业 使用 except 方法:
Queue::fake()->except([
ShipOrder::class,
]);为了测试作业链,你需要利用 Bus 门面的伪造能力。Bus 门面的 assertChained 方法可用于断言一个 作业链 已被分派。assertChained 方法接受一个链式作业数组作为其第一个参数:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);如你在上述示例中所见,链式作业数组可以是作业的类名数组。然而,你也可以提供实际作业实例数组。这样做时,Laravel 将确保作业实例与你的应用程序分发的链式作业属于同一类并具有相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);您可以使用 assertDispatchedWithoutChain 方法来断言一个任务被推送时没有任务链:
Bus::assertDispatchedWithoutChain(ShipOrder::class);如果一个链式作业 在现有链中前置或附加作业,你可以使用该作业的 assertHasChain 方法来断言该作业具有预期的剩余作业链:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);该 assertDoesntHaveChain 方法可用于断言任务的剩余链为空:
$job->assertDoesntHaveChain();如果您的作业链包含一批作业,您可以通过在链断言中插入一个Bus::chainedBatch 定义来断言该链式批处理符合您的预期:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);The Bus facade's assertBatched method may be used to assert that a batch of jobs was dispatched. The closure given to the assertBatched method receives an instance of Illuminate\Bus\PendingBatch, which may be used to inspect the jobs within the batch:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'Import CSV' &&
$batch->jobs->count() === 10;
});您可以使用 assertBatchCount 方法来断言指定数量的批次已被分派:
Bus::assertBatchCount(3);您可以使用 assertNothingBatched 来断言没有批次被分发:
Bus::assertNothingBatched();此外,您可能偶尔需要测试单个作业与其底层批次之间的交互。例如,您可能需要测试作业是否取消了其批次的后续处理。为此,您需要通过 withFakeBatch 方法将一个模拟批次分配给该作业。withFakeBatch 方法返回一个元组,其中包含作业实例和模拟批次:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);有时,你可能需要测试一个队列作业将其自身释放回队列。或者,你可能需要测试该作业是否已自行删除。你可能通过实例化该作业并调用 withFakeQueueInteractions 方法来测试这些队列交互。
一旦该作业的队列交互已被模拟,你可以在该作业上调用 handle 方法。调用该作业后,各种断言方法可用于验证该作业的队列交互:
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();使用 before 和 after 方法,在 Queue 外观上,你可以指定在队列任务处理之前或之后执行的回调。这些回调是进行额外日志记录或增加统计数据的绝佳机会,以用于仪表盘。通常,你应该从 boot 方法中调用这些方法,该方法属于 服务提供者。例如,我们可以使用 AppServiceProvider,它包含在 Laravel 中:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}使用 Queue 门面 上的 looping 方法,你可以指定在工作进程尝试从队列中获取任务之前执行的回调。例如,你可以注册一个闭包,来回滚任何由先前失败的任务遗留的未关闭的事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});