RRuna

消息 Message

发布订阅消息与多驱动 Broker

message 提供发布订阅模型。默认使用内存 broker,适合同进程消息广播;生产环境可以按需接入 Redis、NATS、AMQP 或 MQTT。

它和 event 的侧重点不同:event 更像业务领域事件,message 更像通用 pub/sub 通道。

安装

go get github.com/duxweb/runa/message

可选驱动:

go get github.com/duxweb/runa/message/redis
go get github.com/duxweb/runa/message/nats
go get github.com/duxweb/runa/message/amqp
go get github.com/duxweb/runa/message/mqtt

接入应用

package main

import (
    "context"
    "fmt"

    "github.com/duxweb/runa"
    "github.com/duxweb/runa/message"
    "github.com/duxweb/runa/provider"
)

type Notice struct {
    Text string `json:"text"`
}

type appModule struct {
    provider.ModuleBase
}

func (appModule) Name() string { return "app" }

func (appModule) Register(ctx context.Context, app provider.Context) error {
    messages, err := provider.Invoke[*message.Registry](app)
    if err != nil {
        return err
    }
    messages.Subscribe("default", "notice.created", func(ctx context.Context, msg *message.MessageOf[Notice]) error {
        fmt.Println(msg.Payload.Text)
        return nil
    })
    return nil
}

func main() {
    app := runa.New()
    app.Install(message.Provider(
        message.RegisterBroker("default"),
    ))
    app.Module(appModule{})

    if err := app.Freeze(context.Background()); err != nil {
        panic(err)
    }

    _ = message.Default().Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
}

独立 New 使用

registry := message.New()
registry.Broker("default")
registry.Subscribe("default", "notice.created", handler)
_ = registry.Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})

配置

message 读取 message.brokers.<name>,只作用到已经注册的 broker。

[message.brokers.default]
driver = "memory"

[message.brokers.audit.meta]
role = "audit"
类型 说明
driver string broker 驱动,默认 memory
meta table 自定义元数据

驱动接入

Redis 示例:

import messageredis "github.com/duxweb/runa/message/redis"

app.Install(message.Provider(
    message.RegisterDriver("redis", messageredis.Driver(client, messageredis.Prefix("runa:msg:"))),
    message.RegisterBroker("default", message.Use("redis")),
))

NATS、AMQP、MQTT 驱动也都通过 message.RegisterDriver(name, driver) 接入,驱动包的工厂统一叫 Driver(...),返回 message.Driver 接口。

什么时候用 message

  • 多个模块之间需要发布订阅消息
  • WebSocket、控制台、监控等需要广播运行时消息
  • 需要接入 Redis、NATS、AMQP、MQTT 这类消息系统

如果只是业务内部“用户已创建”这类领域事件,优先看 事件 Event

主题匹配

ok := message.MatchTopic("user.*", "user.created")
_ = ok

订阅时可以使用具体 topic,也可以由驱动支持通配主题。

常见错误

默认 memory broker 用于多实例广播

memory broker 只在当前进程内广播。多实例部署应使用 Redis、NATS、AMQP 或 MQTT 驱动。

订阅后没有长期运行进程

消息订阅需要进程持续运行。命令执行完就退出的程序不适合做长期订阅者。

API 速查

  • message.New() 创建独立注册表
  • message.Provider(...) 接入框架生命周期
  • message.Default() 从默认 DI 取 *message.Registry
  • message.RegisterDriver(name, driver) 注册 broker 驱动
  • message.RegisterBroker(name, options...) 注册 broker
  • registry.Subscribe[T](broker, topic, handler, options...) 订阅消息
  • registry.Publish(ctx, broker, topic, payload, options...) 发布消息
编辑此页