Message
Pub/sub messages and multi-driver brokers
message provides a publish/subscribe model. It uses an in-memory broker by default, suitable for same-process broadcasts. Production deployments can install Redis, NATS, AMQP, or MQTT on demand.
Install
go get github.com/duxweb/runa/message
Optional drivers:
go get github.com/duxweb/runa/message/redis
go get github.com/duxweb/runa/message/nats
go get github.com/duxweb/runa/message/amqp
go get github.com/duxweb/runa/message/mqtt
Connect to an application
package main
import (
"context"
"fmt"
"github.com/duxweb/runa"
"github.com/duxweb/runa/message"
"github.com/duxweb/runa/provider"
)
type Notice struct {
Text string `json:"text"`
}
type appModule struct {
provider.ModuleBase
}
func (appModule) Name() string { return "app" }
func (appModule) Register(ctx context.Context, app provider.Context) error {
messages, err := provider.Invoke[*message.Registry](app)
if err != nil {
return err
}
messages.Subscribe("default", "notice.created", func(ctx context.Context, msg *message.MessageOf[Notice]) error {
fmt.Println(msg.Payload.Text)
return nil
})
return nil
}
func main() {
app := runa.New()
app.Install(message.Provider(
message.RegisterBroker("default"),
))
app.Module(appModule{})
if err := app.Freeze(context.Background()); err != nil {
panic(err)
}
_ = message.Default().Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
}
Standalone New usage
registry := message.New()
registry.Broker("default")
registry.Subscribe("default", "notice.created", handler)
_ = registry.Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
Config
message reads message.brokers.<name> and only applies config to brokers that have already been registered.
[message.brokers.default]
driver = "memory"
[message.brokers.audit.meta]
role = "audit"
| Key | Type | Description |
|---|---|---|
driver |
string | broker driver, default memory |
meta |
table | custom metadata |
Driver integration
Redis example:
import messageredis "github.com/duxweb/runa/message/redis"
app.Install(message.Provider(
message.RegisterDriver("redis", messageredis.Driver(client, messageredis.Prefix("runa:msg:"))),
message.RegisterBroker("default", message.Use("redis")),
))
NATS, AMQP, and MQTT drivers all connect through message.RegisterDriver(name, driver). Driver package factories are consistently named Driver(...) and return the message.Driver interface.
Topic matching
ok := message.MatchTopic("user.*", "user.created")
_ = ok
Subscriptions can use concrete topics, and drivers may support wildcard topics.
When to use message
Use message for pub/sub style notification between parts of the system. Use queue when work must be processed reliably with retry and worker control.
Typical message use cases include cache invalidation, lightweight broadcasts, and event fan-out where losing a message is acceptable or handled elsewhere.
Common mistakes
Using the default memory broker for multi-instance broadcast
The memory broker only works inside the current process. Use Redis, NATS, AMQP, or MQTT for cross-instance messages.
Subscribing without a long-running process
Subscriptions need a running process. Make sure the application starts the relevant Host or command that keeps subscriptions alive.
API quick reference
message.New()creates a standalone registry.message.Provider(...)connects to the framework lifecycle.message.Default()reads*message.Registryfrom default DI.message.RegisterDriver(name, driver)registers a broker driver.message.RegisterBroker(name, options...)registers a broker.registry.Subscribe[T](broker, topic, handler, options...)subscribes to messages.registry.Publish(ctx, broker, topic, payload, options...)publishes a message.