RRuna

Message

Pub/sub messages and multi-driver brokers

message provides a publish/subscribe model. It uses an in-memory broker by default, suitable for same-process broadcasts. Production deployments can install Redis, NATS, AMQP, or MQTT on demand.

Install

go get github.com/duxweb/runa/message

Optional drivers:

go get github.com/duxweb/runa/message/redis
go get github.com/duxweb/runa/message/nats
go get github.com/duxweb/runa/message/amqp
go get github.com/duxweb/runa/message/mqtt

Connect to an application

package main

import (
    "context"
    "fmt"

    "github.com/duxweb/runa"
    "github.com/duxweb/runa/message"
    "github.com/duxweb/runa/provider"
)

type Notice struct {
    Text string `json:"text"`
}

type appModule struct {
    provider.ModuleBase
}

func (appModule) Name() string { return "app" }

func (appModule) Register(ctx context.Context, app provider.Context) error {
    messages, err := provider.Invoke[*message.Registry](app)
    if err != nil {
        return err
    }
    messages.Subscribe("default", "notice.created", func(ctx context.Context, msg *message.MessageOf[Notice]) error {
        fmt.Println(msg.Payload.Text)
        return nil
    })
    return nil
}

func main() {
    app := runa.New()
    app.Install(message.Provider(
        message.RegisterBroker("default"),
    ))
    app.Module(appModule{})

    if err := app.Freeze(context.Background()); err != nil {
        panic(err)
    }

    _ = message.Default().Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})
}

Standalone New usage

registry := message.New()
registry.Broker("default")
registry.Subscribe("default", "notice.created", handler)
_ = registry.Publish(context.Background(), "default", "notice.created", Notice{Text: "hello"})

Config

message reads message.brokers.<name> and only applies config to brokers that have already been registered.

[message.brokers.default]
driver = "memory"

[message.brokers.audit.meta]
role = "audit"
Key Type Description
driver string broker driver, default memory
meta table custom metadata

Driver integration

Redis example:

import messageredis "github.com/duxweb/runa/message/redis"

app.Install(message.Provider(
    message.RegisterDriver("redis", messageredis.Driver(client, messageredis.Prefix("runa:msg:"))),
    message.RegisterBroker("default", message.Use("redis")),
))

NATS, AMQP, and MQTT drivers all connect through message.RegisterDriver(name, driver). Driver package factories are consistently named Driver(...) and return the message.Driver interface.

Topic matching

ok := message.MatchTopic("user.*", "user.created")
_ = ok

Subscriptions can use concrete topics, and drivers may support wildcard topics.

When to use message

Use message for pub/sub style notification between parts of the system. Use queue when work must be processed reliably with retry and worker control.

Typical message use cases include cache invalidation, lightweight broadcasts, and event fan-out where losing a message is acceptable or handled elsewhere.

Common mistakes

Using the default memory broker for multi-instance broadcast

The memory broker only works inside the current process. Use Redis, NATS, AMQP, or MQTT for cross-instance messages.

Subscribing without a long-running process

Subscriptions need a running process. Make sure the application starts the relevant Host or command that keeps subscriptions alive.

API quick reference

  • message.New() creates a standalone registry.
  • message.Provider(...) connects to the framework lifecycle.
  • message.Default() reads *message.Registry from default DI.
  • message.RegisterDriver(name, driver) registers a broker driver.
  • message.RegisterBroker(name, options...) registers a broker.
  • registry.Subscribe[T](broker, topic, handler, options...) subscribes to messages.
  • registry.Publish(ctx, broker, topic, payload, options...) publishes a message.
Edit this page