FluxaORM v2: Code-Generation-Based Go ORM for MySQL, Redis, and ClickHouse
Guide
GitHub
Guide
GitHub
    • Introduction
    • Registry
    • Data Pools
    • Entities
    • Entity Fields
    • MySQL Indexes
    • Code Generation
    • Engine
    • Context
    • Entity Schema
    • Schema Update
    • CRUD Operations
    • Async Flush
    • Search
    • Redis Search
    • MySQL Queries
    • ClickHouse Queries
    • ClickHouse Schema Management
    • Kafka
    • Debezium CDC
    • Local Cache
    • Context Cache
    • Fake Delete
    • Entity Lifecycle Callbacks
    • Metrics
    • Redis Operations
    • Distributed Lock
    • Queries Log
    • Testing

Debezium CDC

FluxaORM supports Debezium Change Data Capture (CDC) integration. This allows automatic streaming of MySQL row changes to Kafka topics via Debezium connectors, enabling real-time event-driven architectures without modifying application write paths.

Debezium captures row-level changes from MySQL's binary log and publishes them to Kafka topics. FluxaORM manages the Debezium connector configuration automatically based on your entity definitions.

Prerequisites

Debezium CDC requires the following infrastructure:

  • MySQL with binary logging enabled
  • Kafka broker(s)
  • Debezium Connect container (Kafka Connect with the Debezium MySQL connector)

MySQL Configuration

MySQL must have binary logging enabled with row-level format. Add the following flags to your MySQL server:

--server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL

Docker Setup

A typical Docker Compose setup includes the Debezium Connect container alongside MySQL and Kafka:

services:
  mysql:
    image: mysql:8.0
    command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL
    ports:
      - "3306:3306"

  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"

  debezium-connect:
    image: debezium/connect:latest
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Enabling Debezium on an Entity

To enable Debezium CDC for an entity, add the debezium tag to the ID field:

type UserEntity struct {
    ID    uint64 `orm:"debezium"`
    Name  string `orm:"required;length=255"`
    Email string `orm:"required;length=255"`
}

By default, the debezium tag uses the default Kafka pool. To specify a different Kafka pool, use debezium=poolCode:

type OrderEntity struct {
    ID     uint64 `orm:"debezium=events"`
    Amount float64
}

Tips

The debezium tag can be combined with other ID field tags like redisCache, localCache, and mysql:

type UserEntity struct {
    ID uint64 `orm:"redisCache;debezium;mysql=users"`
}

Registry Configuration

Register the Debezium Connect REST API URL for a Kafka pool using RegisterDebeziumConnectURL:

registry := fluxaorm.NewRegistry()

// Register MySQL and Kafka pools
registry.RegisterMySQL("user:pass@tcp(localhost:3306)/mydb", fluxaorm.DefaultPoolCode, nil)
registry.RegisterKafka([]string{"localhost:9092"}, "kafka", nil)

// Register the Debezium Connect URL for the Kafka pool
registry.RegisterDebeziumConnectURL("http://localhost:8083", "kafka")

// Register entities with debezium tag
registry.RegisterEntity(&UserEntity{})

engine, err := registry.Validate()
if err != nil {
    panic(err)
}

The RegisterDebeziumConnectURL method takes two arguments:

ParameterTypeDescription
urlstringThe Kafka Connect REST API base URL (e.g., http://localhost:8083)
kafkaPoolstringThe Kafka pool code that Debezium topics will be published to

Connector Management

FluxaORM creates one Debezium connector per MySQL pool. The connector automatically manages all entities tagged with debezium in that pool.

Connector Naming

Connectors are named using the pattern fluxa_{mysqlPoolCode}. For example, if your MySQL pool code is default, the connector name is fluxa_default.

GetDebeziumAlters

Use GetDebeziumAlters() to compare the desired Debezium connector state (based on registered entities) with the actual state in Kafka Connect. It returns operations to create, update, or delete connectors:

ctx := engine.NewContext(context.Background())

alters, err := fluxaorm.GetDebeziumAlters(ctx)
if err != nil {
    panic(err)
}
for _, alter := range alters {
    fmt.Println(alter.Description)
    err = alter.Exec(ctx)
    if err != nil {
        panic(err)
    }
}

GetDebeziumAlters() generates the following types of operations:

ScenarioOperation
Connector does not existCreate connector with configuration for all debezium-tagged tables
Connector exists but table list changedUpdate connector configuration
Connector exists but no entities use debeziumDelete connector

Tips

Call GetDebeziumAlters() alongside GetAlters(), GetKafkaAlters(), GetRedisSearchAlters(), and GetClickhouseAlters() in your migration flow to keep all data stores in sync.

Topic Naming

Debezium publishes change events to Kafka topics using the naming pattern:

fluxa_{mysqlPoolCode}.{database}.{table}

For example, if your MySQL pool code is default, the database is test, and the entity struct is UserEntity, the topic name is:

fluxa_default.test.UserEntity

Use the DebeziumTopicName() method on the entity's Provider to get the topic name:

topicName := UserEntityProvider.DebeziumTopicName(ctx)
// Returns e.g. "fluxa_default.test.UserEntity"

This method is only available on Providers for entities that have the debezium tag configured.

Consuming CDC Events

Registering a Consumer Group

Use DebeziumEntities() on a consumer group builder to automatically subscribe to the correct Debezium topics for your entities. Topic names are resolved automatically during Validate():

registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("my_cdc_consumer", "kafka").
        DebeziumEntities(&UserEntity{}, &OrderEntity{}),
)

You can also combine DebeziumEntities() with regular Topics() on the same consumer group:

registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("mixed_consumer", "kafka").
        DebeziumEntities(&UserEntity{}).
        Topics("custom-events"),
)

Tips

If you prefer to specify topic names manually, you can still use Topics() with Provider.DebeziumTopicName(ctx):

registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("my_cdc_consumer", "kafka").
        Topics(UserEntityProvider.DebeziumTopicName(ctx)),
)

EachDebeziumEvent

Use EachDebeziumEvent() on KafkaFetches to iterate over parsed Debezium events. Each record is automatically deserialized into an entity ID and DebeziumEvent. Tombstone records (used for Kafka log compaction) are skipped silently:

kafka := engine.Kafka("kafka")
cg := kafka.MustConsumerGroup("my_cdc_consumer")
defer cg.Close()

fetches := cg.PollFetches(ctx)
err := fetches.EachDebeziumEvent(func(entityID uint64, event *fluxaorm.DebeziumEvent) error {
    switch event.Op {
    case fluxaorm.DebeziumCreate:
        fmt.Printf("New entity %d: %v\n", entityID, event.After)
    case fluxaorm.DebeziumUpdate:
        fmt.Printf("Updated entity %d: %v -> %v\n", entityID, event.Before, event.After)
    case fluxaorm.DebeziumDelete:
        fmt.Printf("Deleted entity %d\n", entityID)
    }
    return nil
})
if err != nil {
    fmt.Printf("error processing events: %v\n", err)
}

If the handler returns a non-nil error, iteration stops and that error is returned. Parse errors also stop iteration.

Low-Level Parsing

For more control, you can parse records manually using ParseDebeziumEvent and ParseDebeziumKey:

fetches.EachRecord(func(record *fluxaorm.KafkaRecord) {
    event, err := fluxaorm.ParseDebeziumEvent(record)
    if err != nil {
        // handle parse error
        return
    }
    entityID, err := fluxaorm.ParseDebeziumKey(record)
    if err != nil {
        // handle key parse error
        return
    }
    // process event...
})

DebeziumEvent

The DebeziumEvent struct contains the full change event data:

FieldTypeDescription
Beforemap[string]interface{}Field values before the change (nil for inserts)
Aftermap[string]interface{}Field values after the change (nil for deletes)
OpstringOperation type (see below)
SourceDebeziumSourceEvent metadata (database, table, position, timestamp)
TsMsint64Event timestamp in milliseconds

Operation Types

The Op field indicates the type of change:

ConstantValueDescription
DebeziumCreate"c"A new row was inserted
DebeziumUpdate"u"An existing row was updated
DebeziumDelete"d"A row was deleted
DebeziumRead"r"A snapshot read (initial load)

Full Example

package main

import (
    "context"
    "fmt"

    "github.com/latolukasz/fluxaorm/v2"
)

type UserEntity struct {
    ID    uint64 `orm:"debezium"`
    Name  string `orm:"required;length=255"`
    Email string `orm:"required;length=255"`
}

func main() {
    registry := fluxaorm.NewRegistry()
    registry.RegisterMySQL("user:pass@tcp(localhost:3306)/mydb", fluxaorm.DefaultPoolCode, nil)
    registry.RegisterKafka([]string{"localhost:9092"}, "kafka", nil)
    registry.RegisterDebeziumConnectURL("http://localhost:8083", "kafka")
    registry.RegisterEntity(&UserEntity{})

    // Register consumer group with automatic topic resolution
    registry.RegisterKafkaConsumerGroup(
        fluxaorm.NewKafkaConsumerGroup("my_cdc_consumer", "kafka").
            DebeziumEntities(&UserEntity{}),
    )

    engine, err := registry.Validate()
    if err != nil {
        panic(err)
    }

    ctx := engine.NewContext(context.Background())

    // Sync Debezium connectors
    alters, err := fluxaorm.GetDebeziumAlters(ctx)
    if err != nil {
        panic(err)
    }
    for _, alter := range alters {
        fmt.Println(alter.Description)
        err = alter.Exec(ctx)
        if err != nil {
            panic(err)
        }
    }

    // Consume CDC events
    kafka := engine.Kafka("kafka")
    cg := kafka.MustConsumerGroup("my_cdc_consumer")
    defer cg.Close()

    fetches := cg.PollFetches(ctx)
    err = fetches.EachDebeziumEvent(func(entityID uint64, event *fluxaorm.DebeziumEvent) error {
        switch event.Op {
        case fluxaorm.DebeziumCreate:
            fmt.Printf("New entity %d: %v\n", entityID, event.After)
        case fluxaorm.DebeziumUpdate:
            fmt.Printf("Updated entity %d: %v -> %v\n", entityID, event.Before, event.After)
        case fluxaorm.DebeziumDelete:
            fmt.Printf("Deleted entity %d\n", entityID)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("error: %v\n", err)
    }
}

Warning

Debezium CDC events are delivered asynchronously. There may be a short delay between a MySQL write and the corresponding event appearing on the Kafka topic. Do not rely on CDC events for synchronous consistency checks.

Edit this page
Last Updated: 4/1/26, 8:37 AM
Prev
Kafka
Next
Local Cache