消息 Message
发布订阅消息与多驱动 Broker
message 提供发布订阅模型。默认使用内存 broker,适合同进程消息广播;生产环境可以按需接入 Redis、NATS、AMQP 或 MQTT。
它和 event 的侧重点不同:event 更像业务领域事件,message 更像通用 pub/sub 通道。
安装
go get github.com/duxweb/runa/message
可选驱动:
go get github.com/duxweb/runa/message/redis
go get github.com/duxweb/runa/message/nats
go get github.com/duxweb/runa/message/amqp
go get github.com/duxweb/runa/message/mqtt
接入应用
package main
import (
"context"
"fmt"
"github.com/duxweb/runa"
"github.com/duxweb/runa/message"
"github.com/duxweb/runa/provider"
)
type Notice struct {
Text string `json:"text"`
}
type appModule struct {
provider.ModuleBase
}
func (appModule) Name() string { return "app" }
func (appModule) Register(ctx context.Context, app provider.Context) error {
messages, err := provider.Invoke[*message.Registry](app)
if err != nil {
return err
}
messages.Subscribe("default", "notice.created", func(ctx context.Context, msg *message.MessageOf[Notice]) error {
fmt.Println(msg.Payload.Text)
return nil
})
return nil
}
func main() {
app := runa.New()
app.Install(message.Provider(
message.RegisterBroker("default"),
))
app.Module(appModule{})
if err := app.Freeze(context.Background()); err != nil {
panic(err)
}
_ = message.Default().Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
}
独立 New 使用
registry := message.New()
registry.Broker("default")
registry.Subscribe("default", "notice.created", handler)
_ = registry.Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
配置
message 读取 message.brokers.<name>,只作用到已经注册的 broker。
[message.brokers.default]
driver = "memory"
[message.brokers.audit.meta]
role = "audit"
| 键 | 类型 | 说明 |
|---|---|---|
driver |
string | broker 驱动,默认 memory |
meta |
table | 自定义元数据 |
驱动接入
Redis 示例:
import messageredis "github.com/duxweb/runa/message/redis"
app.Install(message.Provider(
message.RegisterDriver("redis", messageredis.Driver(client, messageredis.Prefix("runa:msg:"))),
message.RegisterBroker("default", message.Use("redis")),
))
NATS、AMQP、MQTT 驱动也都通过 message.RegisterDriver(name, driver) 接入,驱动包的工厂统一叫 Driver(...),返回 message.Driver 接口。
什么时候用 message
- 多个模块之间需要发布订阅消息
- WebSocket、控制台、监控等需要广播运行时消息
- 需要接入 Redis、NATS、AMQP、MQTT 这类消息系统
如果只是业务内部“用户已创建”这类领域事件,优先看 事件 Event。
主题匹配
ok := message.MatchTopic("user.*", "user.created")
_ = ok
订阅时可以使用具体 topic,也可以由驱动支持通配主题。
常见错误
默认 memory broker 用于多实例广播
memory broker 只在当前进程内广播。多实例部署应使用 Redis、NATS、AMQP 或 MQTT 驱动。
订阅后没有长期运行进程
消息订阅需要进程持续运行。命令执行完就退出的程序不适合做长期订阅者。
API 速查
message.New()创建独立注册表message.Provider(...)接入框架生命周期message.Default()从默认 DI 取*message.Registrymessage.RegisterDriver(name, driver)注册 broker 驱动message.RegisterBroker(name, options...)注册 brokerregistry.Subscribe[T](broker, topic, handler, options...)订阅消息registry.Publish(ctx, broker, topic, payload, options...)发布消息