thambaru/debezium_to_kafka

0.2.0
debezium_to_kafka

Ballerina librarythambaru/debezium_to_kafka v0.2.0

A Ballerina library package that bridges Debezium CDC events from MySQL, PostgreSQL, and SQL Server to Apache Kafka in a single, easy-to-configure listener. All change events are normalized into a typed CdcEvent envelope, serialized (JSON, Avro, or custom), routed to the right Kafka topic, and published — with optional filter/transform hooks, exponential-backoff retry, and dead-letter queue support.


Architecture

┌──────────────────────┐     CDC events      ┌─────────────────────────────────────────┐
│  MySQL / PostgreSQL  │ ──────────────────► │         CdcKafkaListener                │
│  / SQL Server        │                     │                                         │
└──────────────────────┘                     │  ┌─────────────────────────────────┐    │
                                             │  │ Internal cdc:Service (per table)│    │
                                             │  │  1. EventFilter   (optional)    │    │
                                             │  │  2. EventTransformer (optional) │    │
                                             │  │  3. User hook service (optional)│    │
                                             │  │  4. Serializer (JSON/Avro/CUST) │    │
                                             │  │  5. TopicResolver               │    │
                                             │  │  6. kafka:Producer + retry      │    │
                                             │  └──────────────┬──────────────────┘    │
                                             └─────────────────┼───────────────────────┘
                                                               │
                                                               ▼
                                                    ┌────────────────────┐
                                                    │   Apache Kafka     │
                                                    │  cdc.db.table_a    │
                                                    │  cdc.db.table_b    │
                                                    │  …                 │
                                                    │  [DLQ topic]       │
                                                    └────────────────────┘

Quick start — MySQL CDC → Kafka in ~15 lines

Copy
import thambaru/debezium_to_kafka as cdcKafka;
import ballerinax/mysql.cdc.driver as _;   // pull in the Debezium MySQL connector

listener cdcKafka:CdcKafkaListener bridge = new ({
    database: {
        dbType: cdcKafka:MYSQL,
        hostname: "localhost",
        username: "root",
        password: "password",
        includedDatabases: ["inventory"],
        includedTables: ["inventory.products", "inventory.orders"]
    },
    kafkaBootstrapServers: "localhost:9092",
    topicRouting: {strategy: cdcKafka:TABLE_NAME, topicPrefix: "cdc"},
    serialization: {format: cdcKafka:JSON}
});

// Zero-code forwarding — all events auto-publish to:
//   cdc.inventory.products
//   cdc.inventory.orders

Configuration reference

CdcKafkaConfig

FieldTypeDefaultDescription
databaseMySqlCdcConfig|PostgresCdcConfig|MsSqlCdcConfigDatabase CDC source config
kafkaBootstrapServersstring|string[]Kafka broker(s)
kafkaProducerConfigkafka:ProducerConfiguration{acks: ACKS_ALL, enableIdempotence: true}Kafka producer settings
topicRoutingTopicRoutingConfig{strategy: TABLE_NAME, topicPrefix: "cdc"}Topic routing strategy
serializationSerializationConfig{format: JSON}Serialization format
retryConfigRetryConfigsee belowRetry/backoff policy
deadLetterTopicstring?()DLQ topic for failed events

MySQL — MySqlCdcConfig

Copy
database: {
    dbType: cdcKafka:MYSQL,
    hostname: "localhost",       // default: "localhost"
    port: 3306,                  // default: 3306
    username: "debezium",
    password: "dbz",
    includedDatabases: ["mydb"],
    includedTables: ["mydb.orders", "mydb.products"],
    excludedTables: (),
    includedColumns: (),
    excludedColumns: ()
}

PostgreSQL — PostgresCdcConfig

Copy
import ballerinax/postgresql.cdc.driver as _;

database: {
    dbType: cdcKafka:POSTGRESQL,
    hostname: "localhost",       // default: "localhost"
    port: 5432,                  // default: 5432
    username: "postgres",
    password: "postgres",
    databaseName: "mydb",        // required
    includedSchemas: ["public"],
    includedTables: ["public.orders"],
    pluginName: postgresql:PGOUTPUT,  // default: PGOUTPUT
    slotName: "debezium",             // default: "debezium"
    publicationName: "dbz_publication"
}

SQL Server — MsSqlCdcConfig

Copy
database: {
    dbType: cdcKafka:MSSQL,
    hostname: "localhost",       // default: "localhost"
    port: 1433,                  // default: 1433
    username: "sa",
    password: "YourStrong!Passw0rd",
    databaseName: "TestDB",      // required
    includedTables: ["dbo.Orders", "dbo.Customers"]
}

Topic routing

Per-table (TABLE_NAME)

Each event is published to {topicPrefix}.{db}.{table}:

Copy
topicRouting: {
    strategy: cdcKafka:TABLE_NAME,
    topicPrefix: "cdc"
}
// inventory.products  →  cdc.inventory.products
// inventory.orders    →  cdc.inventory.orders

Note: TABLE_NAME strategy creates one internal CDC service per table. You must specify includedTables in the database config; otherwise a catch-all service is used and the topic name will contain "unknown".

Single topic (SINGLE_TOPIC)

All events from all tables go to one topic:

Copy
topicRouting: {
    strategy: cdcKafka:SINGLE_TOPIC,
    singleTopic: "all-cdc-events"
}

Custom resolver (CUSTOM)

Provide your own routing logic:

Copy
topicRouting: {
    strategy: cdcKafka:CUSTOM,
    customResolver: isolated function(cdcKafka:CdcEvent event) returns string {
        return event.operation == cdcKafka:DELETE ? "cdc-deletes" : "cdc-changes";
    }
}

Serialization

JSON (default)

Copy
serialization: {format: cdcKafka:JSON}

Produces a UTF-8 encoded JSON object matching the CdcEvent structure:

Copy
{
  "operation": "CREATE",
  "tableName": "inventory.products",
  "before": null,
  "after": {"id": 1, "name": "Widget", "price": 9.99},
  "timestamp": [1740394800, 0],
  "metadata": {}
}

Avro (schema registry)

Copy
serialization: {
    format: cdcKafka:AVRO,
    schemaRegistryUrl: "http://schema-registry:8081"
}

Configures the Kafka producer with valueSerializerType = SER_AVRO.

Custom serializer

Copy
import ballerina/lang.value;

serialization: {
    format: cdcKafka:CUSTOM,
    customSerializer: isolated function(cdcKafka:CdcEvent event) returns byte[]|error {
        // Example: MessagePack or Protobuf
        json j = check value:toJson(event);
        return j.toJsonString().toBytes();
    }
}

Optional hooks

service on syntax (pre-publish side-effects)

Copy
service on bridge {
    remote function onCreate(cdcKafka:CdcEvent event) returns error? {
        log:printInfo("Row inserted", table = event.tableName, row = event.after);
    }

    remote function onUpdate(cdcKafka:CdcEvent event) returns error? {
        // audit log, metrics, etc.
    }

    remote function onDelete(cdcKafka:CdcEvent event) returns error? { }

    remote function onRead(cdcKafka:CdcEvent event) returns error? { }
}

Hooks are best-effort: a hook error is logged but does not abort the Kafka publish.

Event filter

Use setFilter() to drop events before they reach Kafka:

Copy
bridge.setFilter(isolated function(cdcKafka:CdcEvent event) returns boolean {
    // Publish only INSERT and UPDATE; skip DELETE and READ.
    return event.operation == cdcKafka:CREATE || event.operation == cdcKafka:UPDATE;
});

Event transformer

Use setTransformer() to enrich or modify events in-flight:

Copy
bridge.setTransformer(isolated function(cdcKafka:CdcEvent event) returns cdcKafka:CdcEvent|error {
    map<anydata> meta = event.metadata;
    meta["env"] = "production";
    meta["capturedBy"] = "debezium-bridge";
    return {...event, metadata: meta};
});

Both hooks can be set at any time, including after 'start().


Retry and dead-letter queue

Copy
retryConfig: {
    maxRetries: 5,           // default: 3
    initialBackoff: 0.5,     // default: 1.0 s
    maxBackoff: 60.0,        // default: 30.0 s
    backoffMultiplier: 2.0   // default: 2.0
},
deadLetterTopic: "cdc-dlq"  // optional; omit to surface errors via cdc framework

On a Kafka publish failure the bridge retries with exponential backoff:

delay(n) = min(initialBackoff × backoffMultiplier^n, maxBackoff)

If all retries are exhausted and deadLetterTopic is set, the original event payload is written to the DLQ with error metadata in Kafka headers:

HeaderValue
x-original-tableFully-qualified table name
x-operationCREATE / UPDATE / DELETE / READ
x-error-messageLast Kafka error message

If deadLetterTopic is (), the error is propagated back to the ballerinax/cdc framework, which applies its eventProcessingFailureHandlingMode (FAIL, WARN, or SKIP).


CdcEvent envelope

Copy
public type CdcEvent record {|
    cdc:Operation operation;   // CREATE | UPDATE | DELETE | READ
    string        tableName;   // fully qualified: "db.table"
    record {}?    before;      // null for CREATE/READ
    record {}?    after;       // null for DELETE
    time:Utc      timestamp;   // UTC capture time
    map<anydata>  metadata;    // extensible key/value bag
|};

Running the tests

Copy
# Unit tests (no external dependencies):
bal test --groups unit

# Integration tests (requires Docker Compose):
docker compose up -d      # starts MySQL, Kafka, Schema Registry
bal test --groups integration

A minimal docker-compose.yml for the integration test environment:

Copy
services:
  mysql:
    image: debezium/example-mysql:2.4
    ports: ["3306:3306"]
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_USER: debezium
      MYSQL_PASSWORD: dbz

  zookeeper:
    image: confluentinc/cp-zookeeper:7.6
    environment: {ZOOKEEPER_CLIENT_PORT: 2181}

  kafka:
    image: confluentinc/cp-kafka:7.6
    ports: ["9092:9092"]
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6
    ports: ["8081:8081"]
    depends_on: [kafka]
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_HOST_NAME: schema-registry

Decisions and limitations

DecisionRationale
OracleDB excludedballerinax/oracledb does not yet publish a CdcListener. Will add when available.
Listener-based architectureUsing Ballerina's listener/service on syntax keeps the integration idiomatic and familiar.
Internal service + optional user serviceThe package always attaches its own service for Kafka forwarding; the user's service runs as a pre-publish hook.
Union type for database configMySqlCdcConfig|PostgresCdcConfig|MsSqlCdcConfig keeps the API surface small vs. separate listener classes per DB.
Retry at publish leveleventProcessingFailureHandlingMode is orthogonal; our retry targets Kafka transient failures specifically.
TABLE_NAME requires includedTablesWithout an explicit table list, the CDC framework cannot route per-table events to individual service objects.

License

MIT

Import

import thambaru/debezium_to_kafka;Copy

Other versions

0.2.1

0.2.0

Metadata

Released date: 2 days ago

Version: 0.2.0


Compatibility

Platform: any

Ballerina version: 2201.13.1

GraalVM compatible: Yes


Pull count

Total: 0

Current verison: 0


Weekly downloads