Module debezium_to_kafka
thambaru/debezium_to_kafka
debezium_to_kafka
Ballerina library —
thambaru/debezium_to_kafkav0.2.0
A Ballerina library package that bridges Debezium CDC events from MySQL, PostgreSQL, and SQL Server to Apache Kafka in a single, easy-to-configure listener. All change events are normalized into a typed CdcEvent envelope, serialized (JSON, Avro, or custom), routed to the right Kafka topic, and published — with optional filter/transform hooks, exponential-backoff retry, and dead-letter queue support.
Architecture
┌──────────────────────┐ CDC events ┌─────────────────────────────────────────┐ │ MySQL / PostgreSQL │ ──────────────────► │ CdcKafkaListener │ │ / SQL Server │ │ │ └──────────────────────┘ │ ┌─────────────────────────────────┐ │ │ │ Internal cdc:Service (per table)│ │ │ │ 1. EventFilter (optional) │ │ │ │ 2. EventTransformer (optional) │ │ │ │ 3. User hook service (optional)│ │ │ │ 4. Serializer (JSON/Avro/CUST) │ │ │ │ 5. TopicResolver │ │ │ │ 6. kafka:Producer + retry │ │ │ └──────────────┬──────────────────┘ │ └─────────────────┼───────────────────────┘ │ ▼ ┌────────────────────┐ │ Apache Kafka │ │ cdc.db.table_a │ │ cdc.db.table_b │ │ … │ │ [DLQ topic] │ └────────────────────┘
Quick start — MySQL CDC → Kafka in ~15 lines
import thambaru/debezium_to_kafka as cdcKafka; import ballerinax/mysql.cdc.driver as _; // pull in the Debezium MySQL connector listener cdcKafka:CdcKafkaListener bridge = new ({ database: { dbType: cdcKafka:MYSQL, hostname: "localhost", username: "root", password: "password", includedDatabases: ["inventory"], includedTables: ["inventory.products", "inventory.orders"] }, kafkaBootstrapServers: "localhost:9092", topicRouting: {strategy: cdcKafka:TABLE_NAME, topicPrefix: "cdc"}, serialization: {format: cdcKafka:JSON} }); // Zero-code forwarding — all events auto-publish to: // cdc.inventory.products // cdc.inventory.orders
Configuration reference
CdcKafkaConfig
| Field | Type | Default | Description |
|---|---|---|---|
database | MySqlCdcConfig|PostgresCdcConfig|MsSqlCdcConfig | — | Database CDC source config |
kafkaBootstrapServers | string|string[] | — | Kafka broker(s) |
kafkaProducerConfig | kafka:ProducerConfiguration | {acks: ACKS_ALL, enableIdempotence: true} | Kafka producer settings |
topicRouting | TopicRoutingConfig | {strategy: TABLE_NAME, topicPrefix: "cdc"} | Topic routing strategy |
serialization | SerializationConfig | {format: JSON} | Serialization format |
retryConfig | RetryConfig | see below | Retry/backoff policy |
deadLetterTopic | string? | () | DLQ topic for failed events |
MySQL — MySqlCdcConfig
database: { dbType: cdcKafka:MYSQL, hostname: "localhost", // default: "localhost" port: 3306, // default: 3306 username: "debezium", password: "dbz", includedDatabases: ["mydb"], includedTables: ["mydb.orders", "mydb.products"], excludedTables: (), includedColumns: (), excludedColumns: () }
PostgreSQL — PostgresCdcConfig
import ballerinax/postgresql.cdc.driver as _; database: { dbType: cdcKafka:POSTGRESQL, hostname: "localhost", // default: "localhost" port: 5432, // default: 5432 username: "postgres", password: "postgres", databaseName: "mydb", // required includedSchemas: ["public"], includedTables: ["public.orders"], pluginName: postgresql:PGOUTPUT, // default: PGOUTPUT slotName: "debezium", // default: "debezium" publicationName: "dbz_publication" }
SQL Server — MsSqlCdcConfig
database: { dbType: cdcKafka:MSSQL, hostname: "localhost", // default: "localhost" port: 1433, // default: 1433 username: "sa", password: "YourStrong!Passw0rd", databaseName: "TestDB", // required includedTables: ["dbo.Orders", "dbo.Customers"] }
Topic routing
Per-table (TABLE_NAME)
Each event is published to {topicPrefix}.{db}.{table}:
topicRouting: { strategy: cdcKafka:TABLE_NAME, topicPrefix: "cdc" } // inventory.products → cdc.inventory.products // inventory.orders → cdc.inventory.orders
Note:
TABLE_NAMEstrategy creates one internal CDC service per table. You must specifyincludedTablesin the database config; otherwise a catch-all service is used and the topic name will contain"unknown".
Single topic (SINGLE_TOPIC)
All events from all tables go to one topic:
topicRouting: { strategy: cdcKafka:SINGLE_TOPIC, singleTopic: "all-cdc-events" }
Custom resolver (CUSTOM)
Provide your own routing logic:
topicRouting: { strategy: cdcKafka:CUSTOM, customResolver: isolated function(cdcKafka:CdcEvent event) returns string { return event.operation == cdcKafka:DELETE ? "cdc-deletes" : "cdc-changes"; } }
Serialization
JSON (default)
serialization: {format: cdcKafka:JSON}
Produces a UTF-8 encoded JSON object matching the CdcEvent structure:
{ "operation": "CREATE", "tableName": "inventory.products", "before": null, "after": {"id": 1, "name": "Widget", "price": 9.99}, "timestamp": [1740394800, 0], "metadata": {} }
Avro (schema registry)
serialization: { format: cdcKafka:AVRO, schemaRegistryUrl: "http://schema-registry:8081" }
Configures the Kafka producer with valueSerializerType = SER_AVRO.
Custom serializer
import ballerina/lang.value; serialization: { format: cdcKafka:CUSTOM, customSerializer: isolated function(cdcKafka:CdcEvent event) returns byte[]|error { // Example: MessagePack or Protobuf json j = check value:toJson(event); return j.toJsonString().toBytes(); } }
Optional hooks
service on syntax (pre-publish side-effects)
service on bridge { remote function onCreate(cdcKafka:CdcEvent event) returns error? { log:printInfo("Row inserted", table = event.tableName, row = event.after); } remote function onUpdate(cdcKafka:CdcEvent event) returns error? { // audit log, metrics, etc. } remote function onDelete(cdcKafka:CdcEvent event) returns error? { } remote function onRead(cdcKafka:CdcEvent event) returns error? { } }
Hooks are best-effort: a hook error is logged but does not abort the Kafka publish.
Event filter
Use setFilter() to drop events before they reach Kafka:
bridge.setFilter(isolated function(cdcKafka:CdcEvent event) returns boolean { // Publish only INSERT and UPDATE; skip DELETE and READ. return event.operation == cdcKafka:CREATE || event.operation == cdcKafka:UPDATE; });
Event transformer
Use setTransformer() to enrich or modify events in-flight:
bridge.setTransformer(isolated function(cdcKafka:CdcEvent event) returns cdcKafka:CdcEvent|error { map<anydata> meta = event.metadata; meta["env"] = "production"; meta["capturedBy"] = "debezium-bridge"; return {...event, metadata: meta}; });
Both hooks can be set at any time, including after 'start().
Retry and dead-letter queue
retryConfig: { maxRetries: 5, // default: 3 initialBackoff: 0.5, // default: 1.0 s maxBackoff: 60.0, // default: 30.0 s backoffMultiplier: 2.0 // default: 2.0 }, deadLetterTopic: "cdc-dlq" // optional; omit to surface errors via cdc framework
On a Kafka publish failure the bridge retries with exponential backoff:
delay(n) = min(initialBackoff × backoffMultiplier^n, maxBackoff)
If all retries are exhausted and deadLetterTopic is set, the original event
payload is written to the DLQ with error metadata in Kafka headers:
| Header | Value |
|---|---|
x-original-table | Fully-qualified table name |
x-operation | CREATE / UPDATE / DELETE / READ |
x-error-message | Last Kafka error message |
If deadLetterTopic is (), the error is propagated back to the ballerinax/cdc
framework, which applies its eventProcessingFailureHandlingMode (FAIL, WARN, or
SKIP).
CdcEvent envelope
public type CdcEvent record {| cdc:Operation operation; // CREATE | UPDATE | DELETE | READ string tableName; // fully qualified: "db.table" record {}? before; // null for CREATE/READ record {}? after; // null for DELETE time:Utc timestamp; // UTC capture time map<anydata> metadata; // extensible key/value bag |};
Running the tests
# Unit tests (no external dependencies): bal test --groups unit # Integration tests (requires Docker Compose): docker compose up -d # starts MySQL, Kafka, Schema Registry bal test --groups integration
A minimal docker-compose.yml for the integration test environment:
services: mysql: image: debezium/example-mysql:2.4 ports: ["3306:3306"] environment: MYSQL_ROOT_PASSWORD: password MYSQL_USER: debezium MYSQL_PASSWORD: dbz zookeeper: image: confluentinc/cp-zookeeper:7.6 environment: {ZOOKEEPER_CLIENT_PORT: 2181} kafka: image: confluentinc/cp-kafka:7.6 ports: ["9092:9092"] depends_on: [zookeeper] environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" schema-registry: image: confluentinc/cp-schema-registry:7.6 ports: ["8081:8081"] depends_on: [kafka] environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_HOST_NAME: schema-registry
Decisions and limitations
| Decision | Rationale |
|---|---|
| OracleDB excluded | ballerinax/oracledb does not yet publish a CdcListener. Will add when available. |
| Listener-based architecture | Using Ballerina's listener/service on syntax keeps the integration idiomatic and familiar. |
| Internal service + optional user service | The package always attaches its own service for Kafka forwarding; the user's service runs as a pre-publish hook. |
| Union type for database config | MySqlCdcConfig|PostgresCdcConfig|MsSqlCdcConfig keeps the API surface small vs. separate listener classes per DB. |
| Retry at publish level | eventProcessingFailureHandlingMode is orthogonal; our retry targets Kafka transient failures specifically. |
TABLE_NAME requires includedTables | Without an explicit table list, the CDC framework cannot route per-table events to individual service objects. |
License
MIT