Queue
Background queues, workers, and reliable consumption
queue manages queues, workers, and typed jobs. It uses an in-memory driver by default, suitable for development and single-process tasks. Production deployments usually install Redis or AMQP drivers.
Install
go get github.com/duxweb/runa/queue
Optional drivers:
go get github.com/duxweb/runa/queue/redis
go get github.com/duxweb/runa/queue/amqp
Connect to an application
package main
import (
"context"
"fmt"
"github.com/duxweb/runa"
"github.com/duxweb/runa/provider"
"github.com/duxweb/runa/queue"
)
type EmailJob struct {
To string `json:"to"`
}
type appModule struct {
provider.ModuleBase
}
func (appModule) Name() string { return "app" }
func (appModule) Register(ctx context.Context, app provider.Context) error {
queues, err := provider.Invoke[*queue.Registry](app)
if err != nil {
return err
}
queues.Job[EmailJob]("mail.send", func(ctx context.Context, job *queue.Job[EmailJob]) error {
fmt.Println("send email to", job.Payload.To)
return nil
})
return nil
}
func main() {
app := runa.New()
app.Install(queue.Provider(
queue.RegisterQueue("mail", queue.Workers("default")),
queue.RegisterWorker("default", queue.Concurrency(2)),
))
app.Module(appModule{})
if err := app.Run(context.Background()); err != nil {
panic(err)
}
}
Push a job:
_, err := queue.Default().Push(context.Background(), "mail", "mail.send", EmailJob{To: "hello@runa.dev"})
Run a worker process:
go run . queue:work default
Standalone New usage
registry := queue.New()
registry.Queue("mail", queue.Workers("default"))
registry.Worker("default", queue.Concurrency(1))
registry.Job[EmailJob]("mail.send", handler)
if err := registry.Freeze(); err != nil {
panic(err)
}
Standalone usage requires you to create queue.NewUnit(registry, "default") and start the worker with your own process management logic.
Config
queue reads queue.queues.<name> and queue.workers.<name>, and only applies config to queues and workers that have already been registered.
[queue.queues.default]
driver = "memory"
workers = ["default"]
retry = 3
retry_delay = "5s"
timeout = "30s"
retention = "24h"
[queue.workers.default]
concurrency = 4
poll_interval = "100ms"
lease = "30s"
stop_timeout = "30s"
| Scope | Common keys |
|---|---|
queue.queues.<name> |
driver, workers, retry, retry_delay, timeout, retention, meta |
queue.workers.<name> |
concurrency, poll_interval, lease, stop_timeout, meta |
retention is currently a reserved config field for future failed-job cleanup policy. Existing drivers do not automatically delete failed jobs based on retention, so production cleanup should be handled with driver-specific operations.
Redis driver
import (
goredis "github.com/redis/go-redis/v9"
queueredis "github.com/duxweb/runa/queue/redis"
)
client := goredis.NewClient(&goredis.Options{Addr: "127.0.0.1:6379"})
app.Install(queue.Provider(
queue.RegisterDriver("redis", queueredis.Driver(client, queueredis.Prefix("runa:queue:"))),
queue.RegisterQueue("default", queue.Use("redis"), queue.Workers("default")),
queue.RegisterWorker("default", queue.Concurrency(4)),
))
AMQP driver
import queueamqp "github.com/duxweb/runa/queue/amqp"
app.Install(queue.Provider(
queue.RegisterDriver("amqp", queueamqp.Driver(conn, queueamqp.Exchange("runa"))),
queue.RegisterQueue("default", queue.Use("amqp"), queue.Workers("default")),
))
The AMQP driver fits cases that only need publish and consume. It currently does not support Renew, Delete, Count, or List, so parts of queue:list are limited. Prefer Redis for long-running jobs or for workloads that need complete failed-job inspection.
Commands
After installing queue.Provider(), these commands are registered:
go run . queue:list
go run . queue:work default
queue:work is suitable for starting a standalone worker process. A normal serve command only starts registered Host units and does not automatically run every worker command.
Start a worker with Run
If you want app.Run(...) to start HTTP and a worker in the same process, register the worker explicitly as a Host:
func (appModule) Register(ctx context.Context, app provider.Context) error {
queues, err := provider.Invoke[*queue.Registry](app)
if err != nil {
return err
}
return app.RegisterHost(queue.NewUnit(queues, "default"))
}
Now the default serve command starts both the HTTP Host and the queue:default Host. This is convenient for small projects. In production, use a separate queue:work default process when HTTP and workers need independent scaling.
Timeout and lease
These two settings work at different levels:
- Timeout is per job:
queue.Timeout(d)limits how long one job handler can run. When it expires, thectxpassed to the handler is canceled. It can be set when pushing a job, otherwise it falls back to the registered job default, then to the queue default. - Lease is per worker:
queue.Lease(d)is the visibility timeout after a worker reserves a job. While the handler runs, the worker renews the lease everyLease / 2so another worker does not pick the same job too early.
Use a reasonable lease interval and avoid extremely small leases. The memory and redis drivers support renewal. The amqp driver’s Renew is currently unsupported, so long-running AMQP jobs should set Timeout carefully and handlers should respect ctx.Done().
queue.RegisterQueue("default",
queue.Use("redis"),
queue.Timeout(30*time.Second), // queue default: one job can run for 30s
queue.Workers("worker"),
)
queue.RegisterWorker("worker",
queue.Concurrency(20),
queue.Lease(2*time.Minute), // renewed while the job is running
queue.StopTimeout(30*time.Second),
)
// A single push can override Timeout
queue.Default().Push(ctx, "default", "report.build", payload, queue.Timeout(2*time.Minute))
Job options: delay, uniqueness, and metadata
queue.Push can override behavior for one job:
_, err := queue.Default().Push(
ctx,
"default",
"mail.send",
EmailJob{To: "hello@runa.dev"},
queue.Delay(10*time.Minute), // can be consumed after 10 minutes
queue.Unique("mail:hello@runa.dev"), // one job for the same queue + job name + key
queue.Retry(3), // up to 3 retries after the first failure
queue.RetryDelay(5*time.Second),
queue.Timeout(30*time.Second),
queue.Meta("source", "signup"),
)
Common rules:
Delay(d)moves the job to delayed state until it becomes due.Unique(key)prevents duplicate pushes for the same queue, job name, and key. A duplicate Push returns the existing job ID.Retry(n)means up tonretries after the first failed execution. It is not the total execution count.Meta(key, value)is not part of the payload. Queue, job, and push metadata are merged and are available asjob.Meta.- The queue model is at-least-once. Business handlers should still be idempotent because crashes, lease expiry, and retries can run the same business job again.
High concurrency and production guidance
- Default
Concurrency = 1: each worker runs one job at a time by default. Increase it explicitly withqueue.Concurrency(N). IO-heavy workloads can often use higher values, but the downstream service must be able to handle the load. - Use Redis or AMQP in production: the memory driver is in-process and non-durable. It is suitable for development, not multi-instance production.
- Retries are built in: use
queue.Retry(n)andqueue.RetryDelay(d). Failed jobs are released back to the queue until attempts are exhausted, then moved to failed state. - Backpressure is built in: the worker uses a bounded semaphore based on
Concurrency, so in-flight jobs are limited. A large backlog should live in Redis or AMQP, not in worker memory. - Graceful shutdown is bounded:
queue.StopTimeout(d)controls how long shutdown waits for in-flight jobs before returning an error.
Worker topology and queue splitting
In production, avoid putting every task into one queue. Split queues by priority and runtime profile, then attach different workers:
app.Install(queue.Provider(
queue.RegisterDriver("redis", queueredis.Driver(client)),
queue.RegisterQueue("critical", queue.Use("redis"), queue.Workers("critical")),
queue.RegisterQueue("bulk", queue.Use("redis"), queue.Workers("bulk"), queue.Timeout(5*time.Minute)),
queue.RegisterWorker("critical", queue.Concurrency(8), queue.Lease(time.Minute)),
queue.RegisterWorker("bulk", queue.Concurrency(50), queue.Lease(2*time.Minute)),
))
Start them as separate processes:
go run . queue:work critical
go run . queue:work bulk
This keeps urgent jobs from being blocked by large batch workloads. Small projects can register the worker as a Host and start it with HTTP. When workers need independent scaling, split them into serve, queue:work critical, queue:work bulk, and similar processes.
Failure visibility
queue:list shows pending, delayed, reserved, and failed counts for queues. It also shows processed, succeeded, failed, and retried counts for workers:
go run . queue:list
Programmatic integrations can use queue.Default().QueueInfo(ctx), queue.Default().WorkerInfo(ctx), and queue.Default().JobInfo(). The AMQP driver does not support Count or List, so these statistics are not as complete as Redis-backed statistics.
Large workloads: page or batch collection
For millions of records, such as scheduled collection from a third-party API, avoid pushing one job per record. The public API currently exposes single-job Push, not a bulk enqueue API. Pushing millions of tiny jobs one by one is slow and adds large scheduling overhead.
Prefer page or batch jobs. One job processes one batch of records. For example, with 1000 records per page, one million records become about 1000 jobs instead of one million jobs. A cursor-based job can push the next page after finishing the current page:
type CollectPage struct {
Cursor string
Size int
Shard string
}
queue.Default().Job[CollectPage]("collect.page", func(ctx context.Context, job *queue.Job[CollectPage]) error {
page := job.Payload
rows, next, err := fetchFromVendor(ctx, page.Cursor, page.Size, page.Shard)
if err != nil {
return err
}
if err := saveRows(ctx, rows); err != nil {
return err
}
if next != "" {
_, err := queue.Default().Push(ctx, "default", "collect.page", CollectPage{
Cursor: next,
Size: page.Size,
Shard: page.Shard,
})
return err
}
return nil
})
// Use schedule to push the first page for each shard.
// Following pages are pushed by the job itself.
// For example, every day at 02:00 push several CollectPage{Cursor: "", Size: 1000, Shard: "..."} jobs.
Key points:
- Each job handles one batch, such as 1000 to 5000 records, so retry affects only that batch.
- A single cursor chain is sequential. To run in parallel, split by tenant, date, ID range, or another business shard, then use
Concurrencyto process multiple shards at the same time. - Use
Timeoutto bound one page andRetry/RetryDelayto control retry behavior. schedulecan trigger the first page and dispatch it throughqueue.- Add
Uniqueto scheduled entry jobs when the same period and shard must not start twice.
Common mistakes
Registering a Job without a queue and worker
A job handler only defines how to run a job. You still need a queue and at least one worker connected to it.
Using the memory driver in production and losing jobs
The memory driver is for development and single-process tasks. Use Redis or AMQP when jobs must survive process restarts or run across instances.
Doing long work without respecting context
Job handlers receive context.Context. Pass it to database, cache, and external calls so shutdown and timeouts can stop work cleanly.
API quick reference
queue.New()creates a standalone registry.queue.Provider(...)connects to the framework lifecycle.queue.Default()reads*queue.Registryfrom default DI.queue.RegisterDriver(name, driver)registers a driver.queue.RegisterQueue(name, options...)registers a queue.queue.RegisterWorker(name, options...)registers a worker.registry.Job[T](name, handler, options...)registers a typed job.registry.Push(ctx, queue, job, payload, options...)pushes a job.queue.NewUnit(registry, name)creates a worker Host unit.