FluxaORM v2: Code-Generation-Based Go ORM for MySQL, Redis, and ClickHouse
Guide
GitHub
Guide
GitHub
    • Introduction
    • Registry
    • Data Pools
    • Entities
    • Entity Fields
    • MySQL Indexes
    • Code Generation
    • Engine
    • Context
    • Entity Schema
    • Schema Update
    • CRUD Operations
    • Async Flush
    • Search
    • Redis Search
    • MySQL Queries
    • ClickHouse Queries
    • ClickHouse Schema Management
    • Kafka
    • Debezium CDC
    • Local Cache
    • Context Cache
    • Fake Delete
    • Entity Lifecycle Callbacks
    • Metrics
    • Redis Operations
    • Distributed Lock
    • Queries Log
    • Testing

Kafka

FluxaORM provides first-class support for Apache Kafka as a data pool type. Under the hood it uses franz-go, a high-performance pure-Go Kafka client.

A Kafka pool represents a connection to a set of brokers and can contain multiple consumer groups, each with its own set of topics and configuration.

Pool Registration

Register a Kafka pool using the RegisterKafka method:

import "github.com/latolukasz/fluxaorm/v2"

registry := fluxaorm.NewRegistry()

// Kafka pool named "events":
registry.RegisterKafka([]string{"localhost:9092"}, "events", &fluxaorm.KafkaPoolOptions{
    ClientID: "my-service",
})

// Pool with multiple brokers:
registry.RegisterKafka([]string{"localhost:9092", "localhost:9093"}, "events", &fluxaorm.KafkaPoolOptions{
    ClientID: "my-service",
})

// Pool with no options (producer-only):
registry.RegisterKafka([]string{"localhost:9092"}, "events", nil)

Consumer Group Registration

Consumer groups are registered separately using the RegisterKafkaConsumerGroup method and the KafkaConsumerGroupBuilder fluent API:

// Register a single consumer group:
registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("my-group", "events").Topics("orders", "payments"),
)

// Register multiple consumer groups:
registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("order-group", "events").Topics("orders"),
)
registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("payment-group", "events").Topics("payments"),
)

// Consumer group with all options:
registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("my-group", "events").
        Topics("orders", "payments").
        SessionTimeout(30 * time.Second).
        RebalanceTimeout(60 * time.Second).
        FetchMaxBytes(10485760).
        AutoCommitInterval(5 * time.Second),
)

Equivalent YAML configuration:

events:
  kafka:
    brokers:
      - localhost:9092
      - localhost:9093
    clientID: my-service
    consumerGroups:
      - name: order-group
        topics:
          - orders
      - name: payment-group
        topics:
          - payments

KafkaPoolOptions

The KafkaPoolOptions struct configures pool-level settings shared by all consumer groups:

type KafkaPoolOptions struct {
    ClientID              string
    RequiredAcks          int
    ProducerLinger        time.Duration
    MaxBufferedRecords    int
    SASL                  *KafkaSASLConfig
    IgnoredTopics         []string
    IgnoredConsumerGroups []string
}
FieldTypeDefaultDescription
ClientIDstring""Client identifier sent to the broker
RequiredAcksint-1Broker acknowledgment level: 0 = none, 1 = leader only, -1 = all in-sync replicas
ProducerLingertime.Duration0How long to wait before flushing a produce batch
MaxBufferedRecordsint0Maximum number of records buffered in the producer
SASL*KafkaSASLConfignilSASL authentication configuration
IgnoredTopics[]stringnilTopics to exclude from schema management (see Topic Registration)
IgnoredConsumerGroups[]stringnilConsumer groups to exclude from deletion by GetKafkaAlters() (see Consumer Group Management)

KafkaConsumerGroupBuilder

Consumer groups are configured using the KafkaConsumerGroupBuilder fluent API. Create a new builder with NewKafkaConsumerGroup(name, poolCode):

cg := fluxaorm.NewKafkaConsumerGroup("my-group", "events").
    Topics("orders", "payments").
    SessionTimeout(30 * time.Second).
    FetchMaxBytes(10485760)

All builder methods return *KafkaConsumerGroupBuilder for fluent chaining:

MethodDescription
Topics(topics ...string)Topics to consume from (required)
SessionTimeout(d time.Duration)Consumer group session timeout
RebalanceTimeout(d time.Duration)Consumer group rebalance timeout
FetchMaxBytes(n int32)Maximum bytes per fetch response
AutoCommitInterval(d time.Duration)Interval for automatic offset commits; 0 means manual commit

Config Structs

When loading configuration from a file, FluxaORM uses the ConfigKafka, ConfigKafkaConsumerGroup, and ConfigKafkaTopic structs:

type ConfigKafkaConsumerGroup struct {
    Name                 string   `yaml:"name"`
    Topics               []string `yaml:"topics"`
    SessionTimeoutMs     int      `yaml:"sessionTimeoutMs"`
    RebalanceTimeoutMs   int      `yaml:"rebalanceTimeoutMs"`
    FetchMaxBytes        int      `yaml:"fetchMaxBytes"`
    AutoCommitIntervalMs int      `yaml:"autoCommitIntervalMs"`
}

type ConfigKafkaTopic struct {
    Name              string            `yaml:"name"`
    Partitions        int32             `yaml:"partitions"`
    ReplicationFactor int16             `yaml:"replicationFactor"`
    Configs           map[string]string `yaml:"configs"`
}

type ConfigKafka struct {
    Code                  string                     `yaml:"code"`
    Brokers               []string                   `yaml:"brokers"`
    ClientID              string                     `yaml:"clientID"`
    RequiredAcks          int                        `yaml:"requiredAcks"`
    ProducerLingerMs      int                        `yaml:"producerLingerMs"`
    MaxBufferedRecords    int                        `yaml:"maxBufferedRecords"`
    SASLMechanism         string                     `yaml:"saslMechanism"`
    SASLUser              string                     `yaml:"saslUser"`
    SASLPassword          string                     `yaml:"saslPassword"`
    ConsumerGroups        []ConfigKafkaConsumerGroup `yaml:"consumerGroups"`
    IgnoredTopics         []string                   `yaml:"ignoredTopics"`
    IgnoredConsumerGroups []string                   `yaml:"ignoredConsumerGroups"`
    Topics                []ConfigKafkaTopic         `yaml:"topics"`
}

Engine Accessor

Once the engine is created, access a Kafka pool via:

pool := engine.Kafka("events")

This returns a Kafka interface representing the pool. You can produce records directly via the pool, and obtain consumer groups from the pool for consuming records.

Consumer Groups

Each Kafka pool can have one or more consumer groups. Use the pool-level methods to access them:

ConsumerGroup

ConsumerGroup returns a KafkaConsumerGroup and an error. Each call creates a new kgo.Client connected to the brokers — it is a factory method, not a lookup. The caller is responsible for calling Close() on the returned consumer group when done:

cg, err := pool.ConsumerGroup("my-group")
if err != nil {
    // name not registered or connection failed
}
defer cg.Close()

MustConsumerGroup

MustConsumerGroup returns a KafkaConsumerGroup for the given name and panics if the group is not registered or if the connection fails. Use this when you are certain the group was registered and you want to fail fast on errors:

cg := pool.MustConsumerGroup("my-group")
defer cg.Close()

ConsumerGroupNames

ConsumerGroupNames returns the list of registered consumer group names in the pool:

for _, name := range pool.ConsumerGroupNames() {
    fmt.Println(name)
}

Kafka (Pool) Interface

The Kafka interface exposes pool-level methods:

MethodDescription
GetCode() stringReturns the pool code (e.g. "events")
GetBrokers() []stringReturns the list of broker addresses
GetPoolOptions() *KafkaPoolOptionsReturns the pool-level options
ProduceSync(ctx Context, records ...*KafkaRecord) errorProduces records synchronously and blocks until acknowledged
Produce(ctx Context, record *KafkaRecord, callback func(*KafkaRecord, error))Produces a record asynchronously with an optional callback
ConsumerGroup(name string) (KafkaConsumerGroup, error)Creates a new consumer group client by name; returns error if not registered or connection fails
MustConsumerGroup(name string) KafkaConsumerGroupCreates a new consumer group client by name; panics if not registered or connection fails
ConsumerGroupNames() []stringReturns the list of registered consumer group names
Close()Closes the pool-level producer client; consumer groups must be closed individually

KafkaConsumerGroup Interface

The KafkaConsumerGroup interface exposes per-group methods:

MethodDescription
GetName() stringReturns the consumer group name
GetSettings() *KafkaConsumerGroupSettingsReturns the consumer group settings
GetKgoClient() *kgo.ClientReturns the underlying franz-go client
PollFetches(ctx Context) KafkaFetchesPolls for new records
CommitUncommittedOffsets(ctx Context) errorCommits offsets for consumed records
Close()Closes this consumer group's client

KafkaRecord

KafkaRecord represents a single Kafka record (message):

type KafkaRecord struct {
    Topic     string
    Key       []byte
    Value     []byte
    Headers   []KafkaRecordHeader
    Partition int32
    Offset    int64
    Timestamp time.Time
}

KafkaRecordHeader

KafkaRecordHeader represents a key-value header attached to a record:

type KafkaRecordHeader struct {
    Key   string
    Value []byte
}

KafkaFetches

KafkaFetches is returned by PollFetches and provides helper methods for iterating over fetched records:

MethodDescription
Records() []*KafkaRecordReturns all records from the fetch as a flat slice
EachRecord(func(*KafkaRecord))Iterates over each record, calling the provided function
EachError(func(string, int32, error))Iterates over per-partition errors (topic, partition, error)
IsEmpty() boolReturns true if the fetch contains no records

Producing Records

All produce operations are performed through the pool directly. You do not need a consumer group to produce records.

Synchronous Produce

ProduceSync sends one or more records to Kafka and blocks until the broker acknowledges them:

pool := engine.Kafka("events")

record := &fluxaorm.KafkaRecord{
    Topic: "orders",
    Key:   []byte("order-123"),
    Value: []byte(`{"status":"created"}`),
    Headers: []fluxaorm.KafkaRecordHeader{
        {Key: "source", Value: []byte("api")},
    },
}

err := pool.ProduceSync(ctx, record)
if err != nil {
    // handle error
}

Asynchronous Produce

Produce sends a record asynchronously. The call returns immediately and the record is buffered for delivery. An optional callback is invoked when the broker acknowledges or rejects the record:

pool := engine.Kafka("events")

record := &fluxaorm.KafkaRecord{
    Topic: "orders",
    Key:   []byte("order-456"),
    Value: []byte(`{"status":"shipped"}`),
}

pool.Produce(ctx, record, func(r *fluxaorm.KafkaRecord, err error) {
    if err != nil {
        log.Printf("produce failed: %v", err)
    }
})

Consuming Records

PollFetches

PollFetches polls the broker for new records. It returns a KafkaFetches value:

pool := engine.Kafka("events")
cg := pool.MustConsumerGroup("my-group")
defer cg.Close()

fetches := cg.PollFetches(ctx)
if fetches.IsEmpty() {
    return
}

fetches.EachError(func(topic string, partition int32, err error) {
    log.Printf("fetch error topic=%s partition=%d: %v", topic, partition, err)
})

fetches.EachRecord(func(record *fluxaorm.KafkaRecord) {
    fmt.Printf("topic=%s key=%s value=%s\n", record.Topic, record.Key, record.Value)
})

CommitUncommittedOffsets

After processing records, commit the offsets so they are not re-delivered:

err := cg.CommitUncommittedOffsets(ctx)
if err != nil {
    // handle error
}

Tips

When AutoCommitInterval is set to 0 (the default), you must call CommitUncommittedOffsets manually after processing each batch. Set a positive AutoCommitInterval via the KafkaConsumerGroupBuilder to enable automatic periodic commits.

Advanced Usage

If you need direct access to the underlying *kgo.Client from franz-go, use GetKgoClient on a consumer group:

cg := pool.MustConsumerGroup("my-group")
defer cg.Close()
client := cg.GetKgoClient()
// use the franz-go client directly for advanced operations

SASL Authentication

To connect to a Kafka cluster that requires SASL authentication, configure the SASL field in KafkaPoolOptions. SASL is a pool-level setting shared by all consumer groups:

registry.RegisterKafka([]string{"kafka-broker:9093"}, "secure-events", &fluxaorm.KafkaPoolOptions{
    ClientID: "my-service",
    SASL: &fluxaorm.KafkaSASLConfig{
        Mechanism: "SCRAM-SHA-256",
        User:      "kafka-user",
        Password:  "kafka-password",
    },
})
registry.RegisterKafkaConsumerGroup(
    fluxaorm.NewKafkaConsumerGroup("my-group", "secure-events").Topics("orders"),
)

KafkaSASLConfig supports the following mechanisms:

MechanismDescription
PLAINSimple username/password authentication (not recommended for production without TLS)
SCRAM-SHA-256Challenge-response authentication using SHA-256
SCRAM-SHA-512Challenge-response authentication using SHA-512

Equivalent YAML:

secure-events:
  kafka:
    brokers:
      - kafka-broker:9093
    clientID: my-service
    saslMechanism: SCRAM-SHA-256
    saslUser: kafka-user
    saslPassword: kafka-password
    consumerGroups:
      - name: my-group
        topics:
          - orders

Topic Registration

FluxaORM provides schema management for Kafka topics, similar to the ClickHouse schema management feature. Define your Kafka topics using a fluent builder API, register them with the registry, and use GetKafkaAlters() to compare them with the broker state and apply changes.

Defining a Kafka Topic

Use NewKafkaTopic() to create a topic definition with a fluent builder:

import fluxaorm "github.com/latolukasz/fluxaorm/v2"

topic := fluxaorm.NewKafkaTopic("orders", "default").
    Partitions(6).
    ReplicationFactor(3).
    RetentionMs(86400000).
    CleanupPolicy("delete").
    MinInsyncReplicas(2)

Registering Topics

Register Kafka topic definitions with the registry before calling Validate():

registry := fluxaorm.NewRegistry()
registry.RegisterKafka([]string{"localhost:9092"}, "default", nil)

registry.RegisterKafkaTopic(
    fluxaorm.NewKafkaTopic("orders", "default").
        Partitions(6).
        ReplicationFactor(3).
        RetentionMs(86400000).
        CleanupPolicy("delete").
        MinInsyncReplicas(2),
)

registry.RegisterKafkaTopic(
    fluxaorm.NewKafkaTopic("payments", "default").
        Partitions(3).
        ReplicationFactor(3).
        RetentionBytes(1073741824).
        MaxMessageBytes(1048576),
)

engine, err := registry.Validate()

Builder API Reference

All builder methods return *KafkaTopicBuilder for fluent chaining:

MethodDescription
Partitions(n int32)Number of partitions for the topic
ReplicationFactor(n int16)Replication factor for the topic
RetentionMs(ms int64)Message retention time in milliseconds (retention.ms)
RetentionBytes(bytes int64)Maximum retained bytes per partition (retention.bytes)
MaxMessageBytes(bytes int32)Maximum message size in bytes (max.message.bytes)
CleanupPolicy(policy string)Cleanup policy: "delete", "compact", or "delete,compact" (cleanup.policy)
MinInsyncReplicas(n int)Minimum number of in-sync replicas (min.insync.replicas)
CompressionType(ct string)Compression type: "none", "gzip", "snappy", "lz4", "zstd" (compression.type)
Config(key, value string)Set any arbitrary topic configuration key-value pair

The convenience methods (RetentionMs, CleanupPolicy, etc.) are shorthand for common Config() calls.

Getting Topic Alters

Use GetKafkaAlters() to compare registered topic definitions with the actual Kafka broker state and get the pending operations:

ctx := engine.NewContext(context.Background())

alters, err := fluxaorm.GetKafkaAlters(ctx)
if err != nil {
    panic(err)
}
for _, alter := range alters {
    fmt.Println(alter.Description) // e.g. "Create topic 'orders' with 6 partitions"
    fmt.Println(alter.Pool)        // e.g. "default"
}

Each fluxaorm.KafkaAlter has the following fields:

FieldTypeDescription
DescriptionstringHuman-readable description of the pending operation
PoolstringThe Kafka pool code this alter belongs to

To execute all alters:

for _, alter := range alters {
    err = alter.Exec(ctx)
    if err != nil {
        panic(err)
    }
}

What Gets Compared

GetKafkaAlters() generates the following types of operations:

ScenarioOperation
Topic not on brokerCreate topic with registered partitions, replication factor, and configs
Registered partitions > current partitionsIncrease partition count
Registered partitions < current partitionsWarning (partitions cannot be decreased)
Replication factor differsWarning (replication factor cannot be changed via admin API)
Topic config differs from registered valuesAlter topic configuration
Topic on broker but not registeredDelete topic
Consumer group on broker but not registeredDelete consumer group

Warning

Kafka does not support decreasing the number of partitions or changing the replication factor of an existing topic. If these differ, GetKafkaAlters() returns a warning description but no executable operation for these changes.

Warning

FluxaORM deletes all topics on the broker that are not registered, excluding internal topics (those starting with __) and topics listed in IgnoredTopics. See Ignored Topics for how to protect topics from deletion.

Warning

FluxaORM also deletes all consumer groups on the broker that are not registered, excluding internal consumer groups (those starting with __) and consumer groups listed in IgnoredConsumerGroups. See Ignored Consumer Groups for how to protect consumer groups from deletion.

Ignored Topics

By default, GetKafkaAlters() will attempt to delete topics on the broker that are not registered via RegisterKafkaTopic(). Internal Kafka topics (those starting with __, such as __consumer_offsets) are always excluded automatically.

To protect additional topics from deletion, list them in the IgnoredTopics field of KafkaPoolOptions:

registry.RegisterKafka([]string{"localhost:9092"}, "default", &fluxaorm.KafkaPoolOptions{
    IgnoredTopics: []string{"legacy-topic", "external-service-topic"},
})

Equivalent YAML:

default:
  kafka:
    brokers:
      - localhost:9092
    ignoredTopics:
      - legacy-topic
      - external-service-topic

Consumer Group Management

GetKafkaAlters() also manages consumer groups. It lists all consumer groups on the broker and generates DELETE alters for orphaned consumer groups -- those that exist on the broker but are not registered via RegisterKafkaConsumerGroup().

The following consumer groups are excluded from deletion:

  • Consumer groups that are registered in the application
  • Internal consumer groups (those starting with __)
  • Consumer groups listed in IgnoredConsumerGroups

Ignored Consumer Groups

By default, GetKafkaAlters() will attempt to delete consumer groups on the broker that are not registered. Internal consumer groups (those starting with __, such as __consumer_offsets) are always excluded automatically.

To protect additional consumer groups from deletion, list them in the IgnoredConsumerGroups field of KafkaPoolOptions:

registry.RegisterKafka([]string{"localhost:9092"}, "default", &fluxaorm.KafkaPoolOptions{
    IgnoredConsumerGroups: []string{"legacy-consumer", "external-service-consumer"},
})

Equivalent YAML:

default:
  kafka:
    brokers:
      - localhost:9092
    ignoredConsumerGroups:
      - legacy-consumer
      - external-service-consumer

Backward Compatibility

Topic schema management is opt-in and fully backward compatible:

  • If no topics are registered for a Kafka pool via RegisterKafkaTopic(), auto-topic creation remains enabled and GetKafkaAlters() will not manage topics for that pool.
  • Once you register at least one topic for a pool, GetKafkaAlters() takes over topic management for that pool, including deleting unregistered topics.
  • Pools with no registered topics will have all non-internal, non-ignored topics flagged for deletion by GetKafkaAlters().

YAML Configuration

Topics can also be defined in YAML configuration under the topics key within a Kafka pool:

default:
  kafka:
    brokers:
      - "localhost:9092"
    ignoredTopics:
      - "legacy-topic"
    ignoredConsumerGroups:
      - "legacy-consumer"
    topics:
      - name: "orders"
        partitions: 6
        replicationFactor: 3
        configs:
          retention.ms: "86400000"
          cleanup.policy: "delete"
          min.insync.replicas: "2"
      - name: "payments"
        partitions: 3
        replicationFactor: 3
        configs:
          retention.bytes: "1073741824"

Struct Configuration

Topics can be defined using the ConfigKafkaTopic struct within ConfigKafka:

config := &fluxaorm.Config{
    KafkaPools: []fluxaorm.ConfigKafka{
        {
            Code:                  "default",
            Brokers:               []string{"localhost:9092"},
            IgnoredTopics:         []string{"legacy-topic"},
            IgnoredConsumerGroups: []string{"legacy-consumer"},
            Topics: []fluxaorm.ConfigKafkaTopic{
                {
                    Name:              "orders",
                    Partitions:        6,
                    ReplicationFactor: 3,
                    Configs: map[string]string{
                        "retention.ms":         "86400000",
                        "cleanup.policy":       "delete",
                        "min.insync.replicas":  "2",
                    },
                },
            },
        },
    },
}

err := registry.InitByConfig(config)

Full Example

package main

import (
    "context"
    "fmt"

    "github.com/latolukasz/fluxaorm/v2"
)

func main() {
    registry := fluxaorm.NewRegistry()
    registry.RegisterKafka([]string{"localhost:9092"}, "default", &fluxaorm.KafkaPoolOptions{
        IgnoredTopics:         []string{"legacy-topic"},
        IgnoredConsumerGroups: []string{"legacy-consumer"},
    })

    // Define consumer groups
    registry.RegisterKafkaConsumerGroup(
        fluxaorm.NewKafkaConsumerGroup("order-group", "default").Topics("orders"),
    )
    registry.RegisterKafkaConsumerGroup(
        fluxaorm.NewKafkaConsumerGroup("payment-group", "default").Topics("payments"),
    )

    // Define topics
    registry.RegisterKafkaTopic(
        fluxaorm.NewKafkaTopic("orders", "default").
            Partitions(6).
            ReplicationFactor(3).
            RetentionMs(86400000).
            CleanupPolicy("delete").
            MinInsyncReplicas(2),
    )

    registry.RegisterKafkaTopic(
        fluxaorm.NewKafkaTopic("payments", "default").
            Partitions(3).
            ReplicationFactor(3).
            RetentionBytes(1073741824).
            CompressionType("snappy"),
    )

    engine, err := registry.Validate()
    if err != nil {
        panic(err)
    }

    ctx := engine.NewContext(context.Background())

    // Get and apply alters
    alters, err := fluxaorm.GetKafkaAlters(ctx)
    if err != nil {
        panic(err)
    }
    for _, alter := range alters {
        fmt.Println(alter.Description)
        err = alter.Exec(ctx)
        if err != nil {
            panic(err)
        }
    }
}

Tips

Call GetKafkaAlters() alongside GetAlters(), GetRedisSearchAlters(), GetClickhouseAlters(), and GetDebeziumAlters() in your migration flow to keep all data stores in sync.

Async Flush via Kafka

FluxaORM uses Kafka as the transport for asynchronous SQL flushing. When you call RegisterAsyncFlush() on the registry, FluxaORM registers the internal Kafka topic _fluxa_async_sql (and a dead-letter topic _fluxa_async_sql_failed) on the specified Kafka pool.

registry.RegisterKafka([]string{"localhost:9092"}, "events", nil)
registry.RegisterAsyncFlush("events", &fluxaorm.AsyncFlushOptions{
    TopicPartitions: 6,
})

When ctx.FlushAsync(true) or ctx.FlushAsync(false) is called, SQL operations are produced to the _fluxa_async_sql topic with the entity table name as the record key. This ensures that queries for the same table are routed to the same partition, preserving per-table ordering.

The AsyncSQLConsumer uses a Kafka consumer group internally, enabling horizontal scaling across multiple consumer instances. See the Async Flush page for full details on consuming, error handling, and cache update behavior.

The topic names are available as constants:

fluxaorm.AsyncSQLTopicName           // "_fluxa_async_sql"
fluxaorm.AsyncSQLDeadLetterTopicName // "_fluxa_async_sql_failed"

Tips

When using GetKafkaAlters(), the async flush topics and consumer group are included automatically if RegisterAsyncFlush() has been called. No manual topic registration is needed for async flush.

Closing

The pool and consumer groups have separate lifecycles and must be closed independently.

Closing Consumer Groups

Each call to ConsumerGroup or MustConsumerGroup creates a new kgo.Client. The caller is responsible for closing it when done:

cg := pool.MustConsumerGroup("my-group")
defer cg.Close()
// ... consume records ...

Closing the Pool

Calling Close on the pool closes only the pool-level producer client. It does not close any consumer groups — those must be closed individually by the caller:

pool.Close()

Tips

If you are using FluxaORM's engine lifecycle, the engine will close all Kafka pools when it shuts down. However, consumer groups you created must still be closed by your own code.

Edit this page
Last Updated: 3/24/26, 11:38 AM
Prev
ClickHouse Schema Management
Next
Debezium CDC