RRuna

队列 Queue

后台任务队列、worker 与可靠消费

queue 管理队列、worker 和 typed job。默认使用内存驱动,适合开发和单进程任务;生产环境通常接入 Redis 或 AMQP 驱动。

安装

go get github.com/duxweb/runa/queue

可选驱动按需安装。业务代码如果直接创建 Redis 或 AMQP client,也需要 import 对应客户端库:

go get github.com/duxweb/runa/queue/redis
go get github.com/duxweb/runa/queue/amqp

接入应用

package main

import (
    "context"
    "fmt"

    "github.com/duxweb/runa"
    "github.com/duxweb/runa/provider"
    "github.com/duxweb/runa/queue"
)

type EmailJob struct {
    To string `json:"to"`
}

type appModule struct {
    provider.ModuleBase
}

func (appModule) Name() string { return "app" }

func (appModule) Register(ctx context.Context, app provider.Context) error {
    queues, err := provider.Invoke[*queue.Registry](app)
    if err != nil {
        return err
    }
    queues.Job[EmailJob]("mail.send", func(ctx context.Context, job *queue.Job[EmailJob]) error {
        fmt.Println("send email to", job.Payload.To)
        return nil
    })
    return nil
}

func main() {
    app := runa.New()
    app.Install(queue.Provider(
        queue.RegisterQueue("mail", queue.Workers("default")),
        queue.RegisterWorker("default", queue.Concurrency(2)),
    ))
    app.Module(appModule{})

    if err := app.Run(context.Background()); err != nil {
        panic(err)
    }
}

推送任务:

_, err := queue.Default().Push(context.Background(), "mail", "mail.send", EmailJob{To: "hello@runa.dev"})

启动 worker 进程时运行:

go run . queue:work default

独立 New 使用

registry := queue.New()
registry.Queue("mail", queue.Workers("default"))
registry.Worker("default", queue.Concurrency(1))
registry.Job[EmailJob]("mail.send", handler)

if err := registry.Freeze(); err != nil {
    panic(err)
}

独立使用时你需要自己创建 queue.NewUnit(registry, "default") 并交给自己的进程管理逻辑启动 worker。

配置

queue 读取 queue.queues.<name>queue.workers.<name>,只作用到已经注册的队列和 worker。

[queue.queues.default]
driver = "memory"
workers = ["default"]
retry = 3
retry_delay = "5s"
timeout = "30s"
retention = "24h"

[queue.workers.default]
concurrency = 4
poll_interval = "100ms"
lease = "30s"
stop_timeout = "30s"
作用域 常用键
queue.queues.<name> driverworkersretryretry_delaytimeoutretentionmeta
queue.workers.<name> concurrencypoll_intervalleasestop_timeoutmeta

retention 当前是保留配置字段,用于后续失败任务清理策略。现有驱动不会自动按 retention 删除 failed 任务,生产环境需要按驱动能力做单独清理。

Redis 驱动

import (
    goredis "github.com/redis/go-redis/v9"
    queueredis "github.com/duxweb/runa/queue/redis"
)

client := goredis.NewClient(&goredis.Options{Addr: "127.0.0.1:6379"})

app.Install(queue.Provider(
    queue.RegisterDriver("redis", queueredis.Driver(client, queueredis.Prefix("runa:queue:"))),
    queue.RegisterQueue("default", queue.Use("redis"), queue.Workers("default")),
    queue.RegisterWorker("default", queue.Concurrency(4)),
))

AMQP 驱动

import queueamqp "github.com/duxweb/runa/queue/amqp"

app.Install(queue.Provider(
    queue.RegisterDriver("amqp", queueamqp.Driver(conn, queueamqp.Exchange("runa"))),
    queue.RegisterQueue("default", queue.Use("amqp"), queue.Workers("default")),
))

AMQP 驱动适合只需要投递和消费的场景。它当前不支持 RenewDeleteCountList,所以 queue:list 的部分统计会受限,长任务和需要查看 failed 明细的场景更建议优先使用 Redis。

命令

安装 queue.Provider() 后会注册:

go run . queue:list
go run . queue:work default

queue:work 适合单独启动 worker 进程。普通 serve 只会启动 HTTP 等已注册 Host,不会自动替你执行所有 worker 命令。

让 Run 同时启动 worker

如果你希望 app.Run(...) 启动 HTTP 的同时也启动某个 worker,需要把 worker 显式注册成 Host:

func (appModule) Register(ctx context.Context, app provider.Context) error {
    queues, err := provider.Invoke[*queue.Registry](app)
    if err != nil {
        return err
    }
    return app.RegisterHost(queue.NewUnit(queues, "default"))
}

这样默认 serve 命令会同时启动 HTTP Host 和 queue:default Host。小项目可以这样部署;生产环境如果 HTTP 和 worker 需要独立扩缩容,建议继续用 queue:work default 单独启动。

任务超时与租约(Timeout 与 Lease)

两者作用域不同,容易混:

  • Timeout 是单个任务的queue.Timeout(d) 限制一个任务 handler 最多跑多久,超时后传给 handler 的 ctx 会被取消。可在推送时按任务设置,缺省回退到任务默认,再回退到队列默认。
  • Lease 是 worker 的queue.Lease(d) 是任务被取走后的“不可见时长”。worker 会在执行期间按 Lease / 2 自动续租,避免任务还在执行时被其它 worker 重新取走。

建议把 Lease 设成一个合理的续租周期,不要太短。memoryredis 驱动支持续租;amqp 驱动的 Renew 目前不支持,长任务应更谨慎地设置 Timeout,并让 handler 正确响应 ctx.Done()

queue.RegisterQueue("default",
    queue.Use("redis"),
    queue.Timeout(30*time.Second),    // 队列默认:单任务最长 30s
    queue.Workers("worker"),
)
queue.RegisterWorker("worker",
    queue.Concurrency(20),
    queue.Lease(2*time.Minute),       // 执行期间自动续租
    queue.StopTimeout(30*time.Second),
)

// 也可对单个任务覆盖 Timeout
queue.Default().Push(ctx, "default", "report.build", payload, queue.Timeout(2*time.Minute))

任务选项:延迟、去重和元数据

queue.Push 可以按单次任务覆盖一部分行为:

_, err := queue.Default().Push(
    ctx,
    "default",
    "mail.send",
    EmailJob{To: "hello@runa.dev"},
    queue.Delay(10*time.Minute),          // 10 分钟后才可被消费
    queue.Unique("mail:hello@runa.dev"),  // 同一队列 + 同一任务名 + 同一 key 只保留一份
    queue.Retry(3),                       // 首次失败后最多再重试 3 次
    queue.RetryDelay(5*time.Second),
    queue.Timeout(30*time.Second),
    queue.Meta("source", "signup"),
)

常用规则:

  • Delay(d) 会把任务放到 delayed 状态,到时间后才会被 worker 取走。
  • Unique(key) 用于防重复投递,同一队列、同一任务名、同一 key 再次 Push 会返回已有任务 ID。
  • Retry(n) 表示首次执行失败后最多再重试 n 次,不是总执行次数。
  • Meta(key, value) 不进入 payload,会合并队列、任务和本次推送的元数据,handler 里通过 job.Meta 读取。
  • 队列是“至少一次执行”模型,业务 handler 仍然要做幂等。进程崩溃、租约过期、重试都可能让同一业务任务再次执行。

高并发与生产建议

  • 默认 Concurrency = 1:开箱每个 worker 一次只跑一个任务,安全但串行。要并发必须显式 queue.Concurrency(N);IO 密集型任务可以设置到几十或上百,但要看下游服务承压能力。
  • 生产用 Redis / AMQP,不要 memory:memory 驱动是同进程、无持久化,只适合开发;多实例或重启不丢任务要用外部驱动。
  • 失败自动重试queue.Retry(n) + queue.RetryDelay(d)。失败会按延迟重投该任务,超过次数后进入失败状态。
  • 背压是内建的:worker 用有界信号量按 Concurrency 限并发,在途任务数受限。大量任务积压不会撑爆 worker 内存;积压容量主要取决于 Redis / AMQP 等外部驱动。
  • 优雅关闭queue.StopTimeout(d) 限制关闭时等待在途任务的时间,超时返回错误,避免进程无限等待。

Worker 拓扑与队列拆分

生产环境通常不要把所有任务都塞进一个队列。可以按业务优先级和耗时拆成多个队列,再用不同 worker 承接:

app.Install(queue.Provider(
    queue.RegisterDriver("redis", queueredis.Driver(client)),

    queue.RegisterQueue("critical", queue.Use("redis"), queue.Workers("critical")),
    queue.RegisterQueue("bulk", queue.Use("redis"), queue.Workers("bulk"), queue.Timeout(5*time.Minute)),

    queue.RegisterWorker("critical", queue.Concurrency(8), queue.Lease(time.Minute)),
    queue.RegisterWorker("bulk", queue.Concurrency(50), queue.Lease(2*time.Minute)),
))

然后按进程拆开启动:

go run . queue:work critical
go run . queue:work bulk

这样紧急任务不会被大批量任务堵住。小项目可以把 worker 注册成 Host 跟 HTTP 一起启动;需要独立扩缩容时,建议拆成 servequeue:work criticalqueue:work bulk 等多个进程。

失败观测

queue:list 会显示队列的 pending、delayed、reserved、failed 数量,也会显示 worker 的 processed、succeeded、failed、retried 统计:

go run . queue:list

代码里可以通过 queue.Default().QueueInfo(ctx)queue.Default().WorkerInfo(ctx)queue.Default().JobInfo() 做监控集成。注意 AMQP 驱动不支持 CountList,这类统计在 AMQP 下不会像 Redis 一样完整。

海量任务:分页 / 批量采集

要处理几百万条数据,例如定时从第三方采集,通常不要一条记录推一个任务。原因是目前公开 API 只有单条 Push,没有批量入队 API;几百万条逐条入队很慢,百万级小任务的调度开销也大。

更推荐按页或批次切分,一个任务处理一批数据。比如每批 1000 条,百万数据只需要几千个任务,而不是几百万个任务。用游标自我续投,可以把入队压力摊平:

type CollectPage struct {
    Cursor string
    Size   int
    Shard  string
}

queue.Default().Job[CollectPage]("collect.page", func(ctx context.Context, job *queue.Job[CollectPage]) error {
    page := job.Payload
    rows, next, err := fetchFromVendor(ctx, page.Cursor, page.Size, page.Shard)
    if err != nil {
        return err
    }
    if err := saveRows(ctx, rows); err != nil {
        return err
    }
    if next != "" {
        _, err := queue.Default().Push(ctx, "default", "collect.page", CollectPage{
            Cursor: next,
            Size:   page.Size,
            Shard:  page.Shard,
        })
        return err
    }
    return nil
})

// 用 schedule 定时触发每个分片的第一页,后续页由任务自我续投
// 每天 02:00 推多个 CollectPage{Cursor: "", Size: 1000, Shard: "..."}

要点:

  • 每个任务采一批,例如 1000 到 5000 条,失败重试只针对这一批。
  • 单个游标链路是串行的。如果要并行,需要按租户、日期、ID 范围或业务分片启动多条链路,再用 Concurrency 同时处理多个分片。
  • Timeout 控制单页最长耗时,配 RetryRetryDelay 控制失败重试节奏。
  • 配合 schedule 定时触发,Runa 的 schedule 可以通过 queue 派发任务。
  • 对定时采集入口建议加 Unique,避免同一个周期重复启动同一分片。

常见错误

只注册 Job,没有注册队列和 worker

Job 只是任务处理函数。要真正消费,还需要 RegisterQueue(...)RegisterWorker(...),并运行 queue:work workerName

开发环境能跑,生产环境任务丢失

默认 memory 驱动只适合同进程开发。多实例或生产环境应使用 Redis、AMQP 这类外部驱动。

handler 里做太久的事情

HTTP handler 里不要直接做耗时任务。把任务推到 queue,再由 worker 消费。

API 速查

  • queue.New() 创建独立注册表
  • queue.Provider(...) 接入框架生命周期
  • queue.Default() 从默认 DI 取 *queue.Registry
  • queue.RegisterDriver(name, driver) 注册驱动
  • queue.RegisterQueue(name, options...) 注册队列
  • queue.RegisterWorker(name, options...) 注册 worker
  • registry.Job[T](name, handler, options...) 注册 typed job
  • registry.Push(ctx, queue, job, payload, options...) 推送任务
  • queue.NewUnit(registry, name) 创建 worker Host 单元
编辑此页