队列系统
DuxLite 提供了基于 Enqueue 的强大队列系统,支持 Redis 和 AMQP 两种队列后端,用于处理异步任务和提升应用性能。
🚀 快速开始
什么是队列?
队列让你可以将耗时的任务(如发送邮件、图片处理、数据导出等)放到后台异步执行,而不阻塞用户请求。
基础使用
基于 src/Queue/Queue.php
的实际代码:
php
use Core\App;
// 获取队列实例
$queue = App::queue();
// 添加任务到队列
$message = $queue->add(
class: 'App\Jobs\EmailJob',
method: 'send',
params: ['user@example.com', '欢迎注册', '感谢您的注册...'],
name: 'queue' // 队列名称(可选,默认为 'queue')
);
// 立即发送任务
$message->send();
// 延迟执行任务(5分钟后)
$message = $queue->add('App\Jobs\ProcessImageJob', 'process', ['/uploads/image.jpg']);
$message->delay(300)->send();
📋 队列配置
配置文件设置
队列配置分为两个文件:queue.toml
和 database.toml
。
1. 队列服务配置 (config/queue.toml
)
toml
# 队列服务类型:redis 或 amqp
type = "redis"
# 驱动器名称(对应 database.toml 中的配置)
driver = "default"
2. 队列后端配置 (config/database.toml
)
Redis 队列配置:
toml
# Redis 队列后端
[redis.drivers.default]
host = "localhost"
port = 6379
auth = "" # Redis 密码
database = 0
persistent = false
optPrefix = "queue_" # 队列前缀
# 专用队列 Redis
[redis.drivers.queue]
host = "localhost"
port = 6379
auth = ""
database = 2
persistent = false
optPrefix = "dux_queue_"
AMQP 队列配置:
toml
# RabbitMQ / AMQP 配置
[amqp.drivers.default]
host = "localhost"
port = 5672
vhost = "/"
username = "guest"
password = "guest"
persisted = false
prefix = "dux_"
获取队列实例
php
use Core\App;
// 获取默认队列(从 queue.toml 读取类型)
$queue = App::queue();
// 获取指定类型的队列
$redisQueue = App::queue('redis');
$amqpQueue = App::queue('amqp');
🔧 创建队列任务
任务类定义
基于 docs/core/queues.md
的实际代码,创建处理任务的类:
php
<?php
namespace App\Jobs;
class EmailJob
{
public function send(string $to, string $subject, string $body): void
{
// 邮件发送逻辑
$this->sendEmail($to, $subject, $body);
// 记录日志
error_log("邮件已发送到: {$to}");
}
public function sendWelcome(int $userId): void
{
// 发送欢迎邮件
$user = User::find($userId);
if ($user) {
$this->send(
$user->email,
'欢迎注册',
"欢迎 {$user->name} 注册我们的网站!"
);
}
}
private function sendEmail(string $to, string $subject, string $body): void
{
// 实际的邮件发送实现
// 可以使用 PHPMailer、SwiftMailer 等
mail($to, $subject, $body);
}
}
任务类特点
- 无需继承:普通的 PHP 类即可,无需继承特定基类
- 方法调用:通过类名和方法名调用具体任务
- 参数传递:支持任意数量和类型的参数
图片处理任务示例
php
<?php
namespace App\Jobs;
class ImageJob
{
public function resize(string $imagePath, int $width, int $height): void
{
// 调整图片尺寸
$image = imagecreatefromjpeg($imagePath);
$resized = imagescale($image, $width, $height);
$resizedPath = str_replace('.jpg', "_resized_{$width}x{$height}.jpg", $imagePath);
imagejpeg($resized, $resizedPath);
imagedestroy($image);
imagedestroy($resized);
error_log("图片已调整尺寸: {$resizedPath}");
}
public function addWatermark(string $imagePath, string $watermarkPath): void
{
// 添加水印
$image = imagecreatefromjpeg($imagePath);
$watermark = imagecreatefrompng($watermarkPath);
// 水印位置计算
$imageWidth = imagesx($image);
$imageHeight = imagesy($image);
$watermarkWidth = imagesx($watermark);
$watermarkHeight = imagesy($watermark);
// 右下角位置
$x = $imageWidth - $watermarkWidth - 10;
$y = $imageHeight - $watermarkHeight - 10;
// 合并图片
imagecopy($image, $watermark, $x, $y, 0, 0, $watermarkWidth, $watermarkHeight);
// 保存结果
$watermarkedPath = str_replace('.jpg', '_watermarked.jpg', $imagePath);
imagejpeg($image, $watermarkedPath);
imagedestroy($image);
imagedestroy($watermark);
error_log("水印已添加: {$watermarkedPath}");
}
public function compress(string $imagePath, int $quality = 80): void
{
// 压缩图片
$image = imagecreatefromjpeg($imagePath);
$compressedPath = str_replace('.jpg', '_compressed.jpg', $imagePath);
imagejpeg($image, $compressedPath, $quality);
imagedestroy($image);
error_log("图片已压缩: {$compressedPath}");
}
}
🎯 实际应用示例
在控制器中使用队列
php
<?php
namespace App\System\Admin;
use Core\App;
use Core\Resources\Action\Resources;
class User extends Resources
{
// 用户注册后发送欢迎邮件
public function createAfter(Data $data, mixed $info): void
{
// 异步发送欢迎邮件
App::queue()->add(
'App\Jobs\EmailJob',
'send',
[$data->email, '欢迎注册', "欢迎 {$data->nickname} 注册我们的系统!"]
)->send();
}
// 用户上传头像后处理图片
public function uploadAvatar(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface
{
$uploadedFile = $request->getUploadedFiles()['avatar'];
$imagePath = $this->saveUploadedFile($uploadedFile);
// 异步处理图片
$auth = $request->getAttribute('auth');
App::queue()->add(
'App\Jobs\ImageJob',
'resize',
[$imagePath, 200, 200]
)->send();
return send($response, '头像上传成功,正在处理中...');
}
}
数据导出任务
php
<?php
namespace App\Jobs;
use Core\App;
use App\System\Models\SystemUser;
class ExportJob
{
public function exportUsers(array $filters, string $email): void
{
// 查询用户数据
$query = SystemUser::query();
if (!empty($filters['dept_id'])) {
$query->where('dept_id', $filters['dept_id']);
}
if (!empty($filters['status'])) {
$query->where('status', $filters['status']);
}
$users = $query->get();
// 生成 Excel 文件
$filename = 'users_export_' . date('Y-m-d_H-i-s') . '.xlsx';
$filepath = storage_path('exports/' . $filename);
$this->generateExcel($users, $filepath);
// 发送下载链接邮件
App::queue()->add(
'App\Jobs\EmailJob',
'send',
[$email, '用户数据导出完成', "您的用户数据导出已完成,下载链接:" . url('download/' . $filename)]
)->send();
error_log("用户数据导出完成: {$filename}");
}
private function generateExcel($users, $filepath): void
{
// Excel 生成逻辑
// 使用 PhpSpreadsheet 或其他库
$data = [];
$data[] = ['ID', '用户名', '邮箱', '状态', '创建时间']; // 表头
foreach ($users as $user) {
$data[] = [
$user->id,
$user->username,
$user->email,
$user->status ? '启用' : '禁用',
$user->created_at->format('Y-m-d H:i:s')
];
}
// 这里应该使用实际的 Excel 生成库
file_put_contents($filepath, json_encode($data, JSON_UNESCAPED_UNICODE));
}
}
🚀 队列工作进程
启动队列工作进程
bash
# 启动队列工作进程
php artisan queue:work
# 指定队列名称
php artisan queue:work --queue=emails,images
# 设置超时时间
php artisan queue:work --timeout=60
# 设置内存限制
php artisan queue:work --memory=512
# 后台运行
nohup php artisan queue:work > /dev/null 2>&1 &
进程管理配置
使用 Supervisor 管理队列进程:
ini
[program:dux-queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/project/artisan queue:work --sleep=3 --tries=3
autostart=true
autorestart=true
user=www-data
numprocs=4
redirect_stderr=true
stdout_logfile=/path/to/project/storage/logs/worker.log
💡 最佳实践
1. 任务设计原则
php
// ✅ 好的做法 - 任务职责单一
class SendEmailJob extends Job
{
public function handle(): void
{
// 只负责发送邮件
$this->sendEmail();
}
}
class ProcessImageJob extends Job
{
public function handle(): void
{
// 只负责图片处理
$this->processImage();
}
}
// ❌ 避免 - 任务职责过多
class UserRegistrationJob extends Job
{
public function handle(): void
{
$this->sendEmail(); // 发送邮件
$this->processImage(); // 处理头像
$this->updateStats(); // 更新统计
$this->sendSms(); // 发送短信
}
}
2. 错误处理和重试
php
class SendEmailJob extends Job
{
public int $tries = 3; // 最大重试次数
public int $timeout = 60; // 超时时间(秒)
public function handle(): void
{
try {
$this->sendEmail();
} catch (\Exception $e) {
// 记录错误日志
logger()->error('邮件发送失败', [
'error' => $e->getMessage(),
'data' => $this->data
]);
// 重新抛出异常,触发重试
throw $e;
}
}
public function failed(\Throwable $exception): void
{
// 所有重试都失败后的处理
logger()->critical('邮件发送彻底失败', [
'error' => $exception->getMessage(),
'data' => $this->data
]);
// 可以发送告警通知
$this->sendAlert($exception);
}
}
3. 队列监控
php
// 队列状态监控
class QueueMonitor
{
public static function getStats(): array
{
return [
'pending_jobs' => Queue::size(),
'failed_jobs' => Queue::failedCount(),
'processed_jobs' => Queue::processedCount(),
'workers_count' => Queue::workersCount(),
];
}
public static function clearFailedJobs(): void
{
Queue::clearFailedJobs();
}
public static function retryFailedJobs(): void
{
Queue::retryFailedJobs();
}
}
4. 队列优先级
php
// 指定队列名称处理不同优先级任务
$highPriorityQueue = App::queue();
$highPriorityQueue->add('App\Jobs\SmsJob', 'send', [$data], 'high_priority')->send();
$normalQueue = App::queue();
$normalQueue->add('App\Jobs\EmailJob', 'send', [$data], 'normal')->send();
$lowPriorityQueue = App::queue();
$lowPriorityQueue->add('App\Jobs\ExportJob', 'export', [$data], 'low_priority')->send();
🔗 与计划任务集成
使用 Scheduler 注解
基于 src/Scheduler/Attribute/Scheduler.php
的实际代码,可以使用 #[Scheduler]
注解创建定时队列任务:
php
<?php
namespace App\Tasks;
use Core\Scheduler\Attribute\Scheduler;
use Core\App;
class ScheduledQueueJobs
{
#[Scheduler('0 2 * * *')] // 每天凌晨 2 点
public function dailyCleanup(): void
{
// 清理过期数据
App::queue()->add(
'App\Jobs\CleanupJob',
'cleanExpiredData',
[]
)->send();
// 生成日报
App::queue()->add(
'App\Jobs\ReportJob',
'generateDailyReport',
[date('Y-m-d')]
)->send();
}
#[Scheduler('*/30 * * * *')] // 每30分钟
public function healthCheck(): void
{
// 检查队列健康状态
App::queue()->add(
'App\Jobs\MonitorJob',
'checkQueueHealth',
[]
)->send();
}
#[Scheduler('0 0 * * 0')] // 每周日午夜
public function weeklyReport(): void
{
// 生成周报
App::queue()->add(
'App\Jobs\ReportJob',
'generateWeeklyReport',
[date('Y-W')]
)->send();
}
}
启动计划任务服务
bash
# 启动计划任务服务
php dux scheduler
与事件系统集成
php
<?php
use Core\Event\Attribute\Listener;
class QueueEventListener
{
#[Listener('user.registered')]
public function handleUserRegistered($user): void
{
// 异步发送欢迎邮件
App::queue()->add(
'App\Jobs\EmailJob',
'sendWelcome',
[$user->id]
)->send();
// 异步生成用户报告(延迟5分钟)
App::queue()->add(
'App\Jobs\ReportJob',
'generateUserReport',
[$user->id]
)->delay(300)->send();
}
#[Listener('order.completed')]
public function handleOrderCompleted($order): void
{
// 异步发送订单确认邮件
App::queue()->add(
'App\Jobs\EmailJob',
'sendOrderConfirmation',
[$order->id]
)->send();
// 异步更新库存
App::queue()->add(
'App\Jobs\InventoryJob',
'updateStock',
[$order->items]
)->send();
}
}
🎉 总结
DuxLite 队列系统的特点:
- 🚀 基于 Enqueue:使用成熟的 Enqueue 库,性能稳定
- 🔧 多种后端:支持 Redis 和 AMQP 两种队列后端
- 📝 简单易用:普通 PHP 类即可,无需继承特定基类
- ⏰ 延迟执行:支持任务延迟执行
- 🔗 系统集成:与计划任务、事件系统深度集成
- 📊 灵活配置:支持多队列、优先级控制
通过合理使用队列系统,可以显著提升应用的响应性能和用户体验!
🎉 总结
DuxLite 队列系统的特点:
- 🚀 简单易用:简洁的 API,快速上手
- 🔧 多驱动支持:支持 Redis、数据库等多种驱动
- ⚡ 高性能:异步处理,提升应用响应速度
- 🛡️ 可靠性:失败重试机制,确保任务执行
- 📊 监控管理:完整的队列监控和管理功能
合理使用队列系统可以显著提升应用性能和用户体验!