FluxaORM v2: Code-Generation-Based Go ORM for MySQL and Redis
Guide
GitHub
Guide
GitHub
    • Introduction
    • Registry
    • Data Pools
    • Entities
    • Entity Fields
    • MySQL Indexes
    • Code Generation
    • Engine
    • Context
    • Entity Schema
    • Schema Update
    • CRUD Operations
    • Async Flush
    • Search
    • Redis Search
    • MySQL Queries
    • Local Cache
    • Context Cache
    • Fake Delete
    • Entity Lifecycle Callbacks
    • Metrics
    • Redis Operations
    • Distributed Lock
    • Event Broker
    • Dirty Streams
    • Queries Log
    • Testing

Event Broker

FluxaORM provides an event broker built on top of Redis Streams. It lets you publish events to named streams and consume them with single or multiple parallel consumers, all with built-in consumer groups, acknowledgment, and statistics.

Registering a Redis Stream

Before you can publish or consume events, each stream must be registered with a Redis pool during registry setup:

import "github.com/latolukasz/fluxaorm/v2"

registry := fluxaorm.NewRegistry()
registry.RegisterRedis("localhost:6379", 0, "default", nil)

// Register streams on the "default" Redis pool
registry.RegisterRedisStream("user-events", "default")
registry.RegisterRedisStream("order-events", "default")

engine, err := registry.Validate()

RegisterRedisStream(name, redisPool) associates the stream name with a Redis pool. Every stream you plan to publish to or consume from must be registered this way before calling Validate().

Publishing Events

There are two ways to publish events: directly through the EventBroker, or in batches through an EventFlusher.

Direct Publish

Use Publish on the EventBroker when you need the stream message ID back immediately:

ctx := engine.NewContext(context.Background())
broker := ctx.GetEventBroker()

type OrderEvent struct {
    UserID    uint64
    ProductID uint64
    Quantity  int
}

event := OrderEvent{UserID: 1, ProductID: 42, Quantity: 3}

// Publish a struct (serialized automatically via msgpack)
id, err := broker.Publish("order-events", event)

// Publish with additional metadata tags (key-value pairs)
id, err = broker.Publish("order-events", event, "action", "created", "priority", "high")

The body argument can be any value that is serializable with msgpack. It will be automatically serialized when published and can be deserialized on the consumer side with event.Unserialize().

The optional meta variadic arguments are string key-value pairs added as extra fields on the Redis Stream message. They can be read on the consumer side using event.Tag(key).

Batch Publish with EventFlusher

Use NewFlusher() when you want to batch multiple events and send them all at once using a Redis pipeline, which is more efficient for high-throughput scenarios:

broker := ctx.GetEventBroker()
flusher := broker.NewFlusher()

for i := 0; i < 100; i++ {
    err := flusher.Publish("user-events", UserEvent{Action: "login", UserID: uint64(i)})
    if err != nil {
        // serialization error
        log.Fatal(err)
    }
}

// Send all buffered events to Redis in one pipeline call
err := flusher.Flush()

Flush() groups events by their target Redis pool and sends each group via a Redis pipeline for optimal performance. After flushing, the internal buffer is cleared so the flusher can be reused.

Consuming Events

Consumers read events from a stream using Redis consumer groups. FluxaORM automatically creates the consumer group (consumer_group) when the consumer first reads from the stream.

Single Consumer

Use ConsumerSingle when you need exactly one consumer processing messages in order:

broker := ctx.GetEventBroker()

consumer, err := broker.ConsumerSingle(ctx, "order-events")
if err != nil {
    log.Fatal(err)
}
defer consumer.Cleanup()

err = consumer.Consume(10, 5*time.Second, func(events []fluxaorm.Event) error {
    for _, event := range events {
        var order OrderEvent
        err := event.Unserialize(&order)
        if err != nil {
            return err
        }
        fmt.Printf("Order from user %d for product %d\n", order.UserID, order.ProductID)
    }
    return nil
})

ConsumerSingle creates a consumer with the fixed name consumer-single. This guarantees that only one consumer reads the stream, and events are processed from oldest to newest.

The Consume method takes three arguments:

ParameterTypeDescription
countintMaximum number of messages to read per call
blockTimetime.DurationHow long to block waiting for new messages
handlerfunc([]Event) errorCallback that processes the batch of events

Multiple Consumers

Use ConsumerMany when you need parallel processing across several consumers:

broker := ctx.GetEventBroker()

for i := 0; i < 3; i++ {
    go func() {
        consumer, err := broker.ConsumerMany(ctx, "user-events")
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Cleanup()

        err = consumer.Consume(5, 10*time.Second, func(events []fluxaorm.Event) error {
            for _, event := range events {
                var u UserEvent
                err := event.Unserialize(&u)
                if err != nil {
                    return err
                }
                fmt.Printf("[%s] processed user event: %+v\n", consumer.Name(), u)
            }
            return nil
        })
        if err != nil {
            log.Println(err)
        }
    }()
}

Each ConsumerMany consumer receives an auto-generated unique name that includes the current timestamp and a random number (e.g., consumer-2026_02_28_14_30_00-8734529182374). You can retrieve the name with consumer.Name(). Event order is not guaranteed across consumers.

Acknowledgment

When your handler returns nil, all events that were not explicitly acknowledged inside the handler are automatically acknowledged and deleted from the stream after the handler completes.

You can also acknowledge events manually inside the handler using event.Ack():

err = consumer.Consume(10, 5*time.Second, func(events []fluxaorm.Event) error {
    for _, event := range events {
        err := processEvent(event)
        if err != nil {
            // Skip this event — it will still be auto-acked
            // when the handler returns nil
            continue
        }
        // Acknowledge immediately
        err = event.Ack()
        if err != nil {
            return err
        }
    }
    return nil
})

If the handler returns an error, no automatic acknowledgment happens. The unacknowledged events remain pending in the consumer group and can be reclaimed later.

Auto-Claiming Pending Events

When running multiple consumers, some consumers may crash or disconnect before acknowledging their messages. Use AutoClaim to reclaim those pending messages:

consumer, err := broker.ConsumerMany(ctx, "order-events")
if err != nil {
    log.Fatal(err)
}
defer consumer.Cleanup()

// Claim pending messages that have been idle for more than 10 minutes,
// up to 1000 messages per iteration
err = consumer.AutoClaim(1000, 10*time.Minute, func(events []fluxaorm.Event) error {
    for _, event := range events {
        var order OrderEvent
        err := event.Unserialize(&order)
        if err != nil {
            return err
        }
        // reprocess the event
    }
    return nil
})

AutoClaim iterates through all pending messages that have been unacknowledged for longer than minIdle. It processes them in batches and continues until there are no more eligible pending messages.

Cleanup

Always call consumer.Cleanup() when a consumer is done (typically via defer). This removes the consumer from the Redis consumer group:

consumer, err := broker.ConsumerSingle(ctx, "order-events")
if err != nil {
    log.Fatal(err)
}
defer consumer.Cleanup()

Event Interface

Each event passed to the consumer handler implements the Event interface:

type Event interface {
    Ack() error
    ID() string
    Tag(key string) (value string)
    Unserialize(val interface{}) error
}
MethodDescription
Ack()Manually acknowledge and delete the event from the stream. Safe to call multiple times (no-op after first call).
ID()Returns the Redis Stream message ID (e.g., 1709136000000-0).
Tag(key)Returns the value of a metadata tag set during publishing. Returns an empty string if the key does not exist.
Unserialize(val)Deserializes the event body (msgpack) into the provided pointer.

Stream Statistics

The event broker provides methods to inspect stream health and consumer group status:

broker := ctx.GetEventBroker()

// Get statistics for a single stream
stats, err := broker.GetStreamStatistics("order-events")
if stats != nil {
    fmt.Printf("Stream: %s\n", stats.Stream)
    fmt.Printf("Redis pool: %s\n", stats.RedisPool)
    fmt.Printf("Length: %d\n", stats.Len)
    fmt.Printf("Oldest pending event age: %d seconds\n", stats.OldestEventSeconds)
}

// Get statistics for multiple streams at once
allStats, err := broker.GetStreamsStatistics("order-events", "user-events")

// Get statistics for all registered streams (no arguments)
allStats, err = broker.GetStreamsStatistics()

RedisStreamStatistics

FieldTypeDescription
StreamstringStream name
RedisPoolstringRedis pool code the stream is registered on
Lenuint64Total number of messages in the stream
OldestEventSecondsintAge in seconds of the oldest pending (unacknowledged) event
Group*RedisStreamGroupStatisticsConsumer group statistics (nil if no group exists)

RedisStreamGroupStatistics

FieldTypeDescription
GroupstringConsumer group name
Lagint64Number of entries in the stream that are yet to be delivered
Pendinguint64Number of pending (delivered but unacknowledged) messages
LastDeliveredIDstringID of the last message delivered to the group
LastDeliveredDurationtime.DurationTime elapsed since the last delivered message
LowerIDstringID of the oldest pending message
LowerDurationtime.DurationTime elapsed since the oldest pending message
Consumers[]*RedisStreamConsumerStatisticsPer-consumer statistics

RedisStreamConsumerStatistics

FieldTypeDescription
NamestringConsumer name
Pendinguint64Number of pending messages for this consumer
Edit this page
Last Updated: 2/28/26, 4:35 PM
Prev
Distributed Lock
Next
Dirty Streams