Skip to content

Queue 队列

DuxLite 队列系统的核心类定义和 API 规格说明。

Queue 类

命名空间: Core\Queue\Queue

方法

php
public function __construct(array $config = [])
  • 参数: $config - 队列配置数组
  • 说明: 构造函数
php
public function push(string $job, array $data = [], string $queue = 'default'): string
  • 参数:
    • $job - 任务类名
    • $data - 任务数据(可选)
    • $queue - 队列名称(可选)
  • 返回: string - 任务ID
  • 说明: 推送任务到队列
php
public function later(int $delay, string $job, array $data = [], string $queue = 'default'): string
  • 参数:
    • $delay - 延迟秒数
    • $job - 任务类名
    • $data - 任务数据(可选)
    • $queue - 队列名称(可选)
  • 返回: string - 任务ID
  • 说明: 延迟推送任务到队列
php
public function pop(string $queue = 'default'): ?QueueMessage
  • 参数: $queue - 队列名称(可选)
  • 返回: QueueMessage|null - 队列消息对象或null
  • 说明: 从队列中取出任务
php
public function size(string $queue = 'default'): int
  • 参数: $queue - 队列名称(可选)
  • 返回: int - 队列大小
  • 说明: 获取队列中任务数量
php
public function clear(string $queue = 'default'): void
  • 参数: $queue - 队列名称(可选)
  • 返回: void
  • 说明: 清空队列

QueueMessage 类

命名空间: Core\Queue\QueueMessage

属性

属性类型说明
$idstring消息ID
$jobstring任务类名
$dataarray任务数据
$queuestring队列名称
$attemptsint尝试次数
$createdAtint创建时间戳
$availableAtint可用时间戳

方法

php
public function __construct(string $id, string $job, array $data, string $queue = 'default')
  • 参数:
    • $id - 消息ID
    • $job - 任务类名
    • $data - 任务数据
    • $queue - 队列名称(可选)
php
public function getId(): string
  • 返回: string - 消息ID
php
public function getJob(): string
  • 返回: string - 任务类名
php
public function getData(): array
  • 返回: array - 任务数据
php
public function getQueue(): string
  • 返回: string - 队列名称
php
public function getAttempts(): int
  • 返回: int - 尝试次数
php
public function incrementAttempts(): void
  • 返回: void
  • 说明: 增加尝试次数
php
public function isReady(): bool
  • 返回: bool - 是否可以执行
  • 说明: 检查任务是否到达执行时间

QueueProcessor 类

命名空间: Core\Queue\QueueProcessor

方法

php
public function __construct(Queue $queue)
  • 参数: $queue - 队列实例
php
public function process(string $queue = 'default', int $maxJobs = 0, int $memory = 128): void
  • 参数:
    • $queue - 队列名称(可选)
    • $maxJobs - 最大处理任务数(0表示无限制)
    • $memory - 内存限制(MB)
  • 返回: void
  • 说明: 处理队列任务
php
public function processJob(QueueMessage $message): bool
  • 参数: $message - 队列消息
  • 返回: bool - 是否处理成功
  • 说明: 处理单个任务
php
public function failed(QueueMessage $message, \Throwable $exception): void
  • 参数:
    • $message - 队列消息
    • $exception - 异常对象
  • 返回: void
  • 说明: 处理失败任务

任务接口

QueueJobInterface

php
interface QueueJobInterface
{
    public function handle(array $data): void;
}

可重试任务接口

php
interface RetryableJobInterface extends QueueJobInterface
{
    public function getMaxRetries(): int;
    public function getRetryDelay(): int;
}

批量任务接口

php
interface BatchJobInterface extends QueueJobInterface
{
    public function getBatchSize(): int;
    public function processBatch(array $items): void;
}

队列配置结构

配置文件:config/queue.toml

基础配置

toml
[default]
driver = "database"
table = "queue_jobs"
connection = "default"

支持的驱动类型

驱动类型说明必需配置
database数据库队列table, connection
redisRedis队列connection, prefix
sync同步队列无额外配置

Redis 驱动配置

toml
[redis]
driver = "redis"
connection = "default"
prefix = "queue:"
retry_after = 60

多队列配置

toml
[high]
driver = "redis"
connection = "default"
prefix = "queue:high:"

[normal]
driver = "database"
table = "queue_jobs"
connection = "default"

[low]
driver = "database"
table = "queue_jobs_low"
connection = "default"

数据库表结构

队列任务表(queue_jobs):

字段类型说明
idbigint主键ID
queuevarchar(255)队列名称
payloadlongtext任务载荷
attemptstinyint尝试次数
reserved_atint预留时间戳
available_atint可用时间戳
created_atint创建时间戳

失败任务表(failed_jobs):

字段类型说明
idbigint主键ID
connectiontext连接名称
queuetext队列名称
payloadlongtext任务载荷
exceptionlongtext异常信息
failed_attimestamp失败时间

任务状态常量

状态说明
PENDING0待处理
PROCESSING1处理中
COMPLETED2已完成
FAILED3已失败
RETRYING4重试中

队列优先级

优先级数值说明
HIGH100高优先级
NORMAL50普通优先级
LOW10低优先级

命令行工具

QueueCommand 类

命名空间: Core\Queue\QueueCommand

可用命令

bash
# 启动队列处理器
php dux queue:work [queue] [--sleep=3] [--tries=3] [--memory=128] [--timeout=60]

# 处理单个任务
php dux queue:work --once

# 重启队列处理器
php dux queue:restart

# 查看队列状态
php dux queue:status

# 清空队列
php dux queue:clear [queue]

# 重试失败任务
php dux queue:retry [id]

# 查看失败任务
php dux queue:failed

命令参数

参数类型默认值说明
queuestringdefault队列名称
--sleepint3空队列时休眠秒数
--triesint3最大重试次数
--memoryint128内存限制(MB)
--timeoutint60任务超时时间(秒)
--onceboolfalse只处理一个任务

事件钩子

队列事件

事件名触发时机参数
queue.before任务执行前QueueMessage
queue.after任务执行后QueueMessage
queue.failed任务失败时QueueMessage, \Throwable
queue.retrying任务重试时QueueMessage

处理器事件

事件名触发时机参数
worker.starting处理器启动时string $queue
worker.stopping处理器停止时string $queue
worker.looping每次循环时string $queue

基于 MIT 许可证发布