队列 Queue
后台任务队列、worker 与可靠消费
queue 管理队列、worker 和 typed job。默认使用内存驱动,适合开发和单进程任务;生产环境通常接入 Redis 或 AMQP 驱动。
安装
go get github.com/duxweb/runa/queue
可选驱动按需安装。业务代码如果直接创建 Redis 或 AMQP client,也需要 import 对应客户端库:
go get github.com/duxweb/runa/queue/redis
go get github.com/duxweb/runa/queue/amqp
接入应用
package main
import (
"context"
"fmt"
"github.com/duxweb/runa"
"github.com/duxweb/runa/provider"
"github.com/duxweb/runa/queue"
)
type EmailJob struct {
To string `json:"to"`
}
type appModule struct {
provider.ModuleBase
}
func (appModule) Name() string { return "app" }
func (appModule) Register(ctx context.Context, app provider.Context) error {
queues, err := provider.Invoke[*queue.Registry](app)
if err != nil {
return err
}
queues.Job[EmailJob]("mail.send", func(ctx context.Context, job *queue.Job[EmailJob]) error {
fmt.Println("send email to", job.Payload.To)
return nil
})
return nil
}
func main() {
app := runa.New()
app.Install(queue.Provider(
queue.RegisterQueue("mail", queue.Workers("default")),
queue.RegisterWorker("default", queue.Concurrency(2)),
))
app.Module(appModule{})
if err := app.Run(context.Background()); err != nil {
panic(err)
}
}
推送任务:
_, err := queue.Default().Push(context.Background(), "mail", "mail.send", EmailJob{To: "hello@runa.dev"})
启动 worker 进程时运行:
go run . queue:work default
独立 New 使用
registry := queue.New()
registry.Queue("mail", queue.Workers("default"))
registry.Worker("default", queue.Concurrency(1))
registry.Job[EmailJob]("mail.send", handler)
if err := registry.Freeze(); err != nil {
panic(err)
}
独立使用时你需要自己创建 queue.NewUnit(registry, "default") 并交给自己的进程管理逻辑启动 worker。
配置
queue 读取 queue.queues.<name> 和 queue.workers.<name>,只作用到已经注册的队列和 worker。
[queue.queues.default]
driver = "memory"
workers = ["default"]
retry = 3
retry_delay = "5s"
timeout = "30s"
retention = "24h"
[queue.workers.default]
concurrency = 4
poll_interval = "100ms"
lease = "30s"
stop_timeout = "30s"
| 作用域 | 常用键 |
|---|---|
queue.queues.<name> |
driver、workers、retry、retry_delay、timeout、retention、meta |
queue.workers.<name> |
concurrency、poll_interval、lease、stop_timeout、meta |
retention 当前是保留配置字段,用于后续失败任务清理策略。现有驱动不会自动按 retention 删除 failed 任务,生产环境需要按驱动能力做单独清理。
Redis 驱动
import (
goredis "github.com/redis/go-redis/v9"
queueredis "github.com/duxweb/runa/queue/redis"
)
client := goredis.NewClient(&goredis.Options{Addr: "127.0.0.1:6379"})
app.Install(queue.Provider(
queue.RegisterDriver("redis", queueredis.Driver(client, queueredis.Prefix("runa:queue:"))),
queue.RegisterQueue("default", queue.Use("redis"), queue.Workers("default")),
queue.RegisterWorker("default", queue.Concurrency(4)),
))
AMQP 驱动
import queueamqp "github.com/duxweb/runa/queue/amqp"
app.Install(queue.Provider(
queue.RegisterDriver("amqp", queueamqp.Driver(conn, queueamqp.Exchange("runa"))),
queue.RegisterQueue("default", queue.Use("amqp"), queue.Workers("default")),
))
AMQP 驱动适合只需要投递和消费的场景。它当前不支持 Renew、Delete、Count、List,所以 queue:list 的部分统计会受限,长任务和需要查看 failed 明细的场景更建议优先使用 Redis。
命令
安装 queue.Provider() 后会注册:
go run . queue:list
go run . queue:work default
queue:work 适合单独启动 worker 进程。普通 serve 只会启动 HTTP 等已注册 Host,不会自动替你执行所有 worker 命令。
让 Run 同时启动 worker
如果你希望 app.Run(...) 启动 HTTP 的同时也启动某个 worker,需要把 worker 显式注册成 Host:
func (appModule) Register(ctx context.Context, app provider.Context) error {
queues, err := provider.Invoke[*queue.Registry](app)
if err != nil {
return err
}
return app.RegisterHost(queue.NewUnit(queues, "default"))
}
这样默认 serve 命令会同时启动 HTTP Host 和 queue:default Host。小项目可以这样部署;生产环境如果 HTTP 和 worker 需要独立扩缩容,建议继续用 queue:work default 单独启动。
任务超时与租约(Timeout 与 Lease)
两者作用域不同,容易混:
- Timeout 是单个任务的:
queue.Timeout(d)限制一个任务 handler 最多跑多久,超时后传给 handler 的ctx会被取消。可在推送时按任务设置,缺省回退到任务默认,再回退到队列默认。 - Lease 是 worker 的:
queue.Lease(d)是任务被取走后的“不可见时长”。worker 会在执行期间按Lease / 2自动续租,避免任务还在执行时被其它 worker 重新取走。
建议把 Lease 设成一个合理的续租周期,不要太短。memory 和 redis 驱动支持续租;amqp 驱动的 Renew 目前不支持,长任务应更谨慎地设置 Timeout,并让 handler 正确响应 ctx.Done()。
queue.RegisterQueue("default",
queue.Use("redis"),
queue.Timeout(30*time.Second), // 队列默认:单任务最长 30s
queue.Workers("worker"),
)
queue.RegisterWorker("worker",
queue.Concurrency(20),
queue.Lease(2*time.Minute), // 执行期间自动续租
queue.StopTimeout(30*time.Second),
)
// 也可对单个任务覆盖 Timeout
queue.Default().Push(ctx, "default", "report.build", payload, queue.Timeout(2*time.Minute))
任务选项:延迟、去重和元数据
queue.Push 可以按单次任务覆盖一部分行为:
_, err := queue.Default().Push(
ctx,
"default",
"mail.send",
EmailJob{To: "hello@runa.dev"},
queue.Delay(10*time.Minute), // 10 分钟后才可被消费
queue.Unique("mail:hello@runa.dev"), // 同一队列 + 同一任务名 + 同一 key 只保留一份
queue.Retry(3), // 首次失败后最多再重试 3 次
queue.RetryDelay(5*time.Second),
queue.Timeout(30*time.Second),
queue.Meta("source", "signup"),
)
常用规则:
Delay(d)会把任务放到 delayed 状态,到时间后才会被 worker 取走。Unique(key)用于防重复投递,同一队列、同一任务名、同一 key 再次 Push 会返回已有任务 ID。Retry(n)表示首次执行失败后最多再重试n次,不是总执行次数。Meta(key, value)不进入 payload,会合并队列、任务和本次推送的元数据,handler 里通过job.Meta读取。- 队列是“至少一次执行”模型,业务 handler 仍然要做幂等。进程崩溃、租约过期、重试都可能让同一业务任务再次执行。
高并发与生产建议
- 默认
Concurrency = 1:开箱每个 worker 一次只跑一个任务,安全但串行。要并发必须显式queue.Concurrency(N);IO 密集型任务可以设置到几十或上百,但要看下游服务承压能力。 - 生产用 Redis / AMQP,不要 memory:memory 驱动是同进程、无持久化,只适合开发;多实例或重启不丢任务要用外部驱动。
- 失败自动重试:
queue.Retry(n)+queue.RetryDelay(d)。失败会按延迟重投该任务,超过次数后进入失败状态。 - 背压是内建的:worker 用有界信号量按
Concurrency限并发,在途任务数受限。大量任务积压不会撑爆 worker 内存;积压容量主要取决于 Redis / AMQP 等外部驱动。 - 优雅关闭:
queue.StopTimeout(d)限制关闭时等待在途任务的时间,超时返回错误,避免进程无限等待。
Worker 拓扑与队列拆分
生产环境通常不要把所有任务都塞进一个队列。可以按业务优先级和耗时拆成多个队列,再用不同 worker 承接:
app.Install(queue.Provider(
queue.RegisterDriver("redis", queueredis.Driver(client)),
queue.RegisterQueue("critical", queue.Use("redis"), queue.Workers("critical")),
queue.RegisterQueue("bulk", queue.Use("redis"), queue.Workers("bulk"), queue.Timeout(5*time.Minute)),
queue.RegisterWorker("critical", queue.Concurrency(8), queue.Lease(time.Minute)),
queue.RegisterWorker("bulk", queue.Concurrency(50), queue.Lease(2*time.Minute)),
))
然后按进程拆开启动:
go run . queue:work critical
go run . queue:work bulk
这样紧急任务不会被大批量任务堵住。小项目可以把 worker 注册成 Host 跟 HTTP 一起启动;需要独立扩缩容时,建议拆成 serve、queue:work critical、queue:work bulk 等多个进程。
失败观测
queue:list 会显示队列的 pending、delayed、reserved、failed 数量,也会显示 worker 的 processed、succeeded、failed、retried 统计:
go run . queue:list
代码里可以通过 queue.Default().QueueInfo(ctx)、queue.Default().WorkerInfo(ctx) 和 queue.Default().JobInfo() 做监控集成。注意 AMQP 驱动不支持 Count 和 List,这类统计在 AMQP 下不会像 Redis 一样完整。
海量任务:分页 / 批量采集
要处理几百万条数据,例如定时从第三方采集,通常不要一条记录推一个任务。原因是目前公开 API 只有单条 Push,没有批量入队 API;几百万条逐条入队很慢,百万级小任务的调度开销也大。
更推荐按页或批次切分,一个任务处理一批数据。比如每批 1000 条,百万数据只需要几千个任务,而不是几百万个任务。用游标自我续投,可以把入队压力摊平:
type CollectPage struct {
Cursor string
Size int
Shard string
}
queue.Default().Job[CollectPage]("collect.page", func(ctx context.Context, job *queue.Job[CollectPage]) error {
page := job.Payload
rows, next, err := fetchFromVendor(ctx, page.Cursor, page.Size, page.Shard)
if err != nil {
return err
}
if err := saveRows(ctx, rows); err != nil {
return err
}
if next != "" {
_, err := queue.Default().Push(ctx, "default", "collect.page", CollectPage{
Cursor: next,
Size: page.Size,
Shard: page.Shard,
})
return err
}
return nil
})
// 用 schedule 定时触发每个分片的第一页,后续页由任务自我续投
// 每天 02:00 推多个 CollectPage{Cursor: "", Size: 1000, Shard: "..."}
要点:
- 每个任务采一批,例如 1000 到 5000 条,失败重试只针对这一批。
- 单个游标链路是串行的。如果要并行,需要按租户、日期、ID 范围或业务分片启动多条链路,再用
Concurrency同时处理多个分片。 - 配
Timeout控制单页最长耗时,配Retry和RetryDelay控制失败重试节奏。 - 配合
schedule定时触发,Runa 的schedule可以通过queue派发任务。 - 对定时采集入口建议加
Unique,避免同一个周期重复启动同一分片。
常见错误
只注册 Job,没有注册队列和 worker
Job 只是任务处理函数。要真正消费,还需要 RegisterQueue(...) 和 RegisterWorker(...),并运行 queue:work workerName。
开发环境能跑,生产环境任务丢失
默认 memory 驱动只适合同进程开发。多实例或生产环境应使用 Redis、AMQP 这类外部驱动。
handler 里做太久的事情
HTTP handler 里不要直接做耗时任务。把任务推到 queue,再由 worker 消费。
API 速查
queue.New()创建独立注册表queue.Provider(...)接入框架生命周期queue.Default()从默认 DI 取*queue.Registryqueue.RegisterDriver(name, driver)注册驱动queue.RegisterQueue(name, options...)注册队列queue.RegisterWorker(name, options...)注册 workerregistry.Job[T](name, handler, options...)注册 typed jobregistry.Push(ctx, queue, job, payload, options...)推送任务queue.NewUnit(registry, name)创建 worker Host 单元