Module cdc
API
Declarations
Definitions
ballerinax/cdc Ballerina library
Overview
The Change Data Capture (CDC) connector provides APIs to capture and process database change events in real-time. It enables developers to define services that handle change capture events such as inserts, updates, and deletes. Built on top of the Debezium framework, it supports popular databases like MySQL, Microsoft SQL Server, PostgreSQL, and Oracle.
Key Features
- Capture real-time change events (create, read, update, delete) from databases
- Support for multiple database systems including MySQL, MSSQL, PostgreSQL, and Oracle
- Event-driven service model with dedicated handlers for each operation type
- Built on the Debezium framework for reliable change data capture
Quickstart
Step 1: Import Required Modules
Add the following imports to your Ballerina program:
ballerinax/cdc: Core module that provides APIs to capture and process database change events.ballerinax/mysql: Provides MySQL-specific listener and types for CDC. Replace with the corresponding module for your database if needed.ballerinax/mysql.cdc.driver as _: Debezium-based driver for MySQL CDC. Use the appropriate driver for your database (e.g.,mssql.cdc.driver,postgresql.cdc.driver, ororacledb.cdc.driver).
import ballerinax/cdc; import ballerinax/mysql; import ballerinax/mysql.cdc.driver as _;
Step 2: Configure the CDC Listener
Create a CDC listener for your MySQL database by specifying the connection details:
listener mysql:CdcListener mysqlListener = new ({ database: { hostname: "localhost", port: 3306, username: "username", password: "password", includedDatabases: ["inventory"] } });
Step 3: Define the CDC Service
Implement a cdc:Service to handle database change events:
service on mysqlListener { remote function onRead(record {} after) returns cdc:Error? { // Handle the read event log:printInfo(`Record read: ${after}`); } remote function onCreate(record {} after) returns cdc:Error? { // Handle the create event log:printInfo(`Record created: ${after}`); } remote function onUpdate(record {} before, record {} after) returns cdc:Error? { // Handle the update event log:printInfo(`Record updated from: ${before}, to ${after}`); } remote function onDelete(record {} before) returns cdc:Error? { // Handle the delete event log:printInfo(`Record deleted: ${before}`); } }
Step 4: Run the Application
Run your Ballerina application:
bal run
Examples
The cdc module provides practical examples illustrating its usage in various real-world scenarios. Explore these examples to understand how to capture and process database change events effectively.
-
Fraud Detection - Detect suspicious transactions in a financial database and send fraud alerts via email. This example showcases how to integrate the CDC module with the Gmail connector to notify stakeholders of potential fraud.
-
Cache Management - Synchronize a Redis cache with changes in a MySQL database. It listens to changes in the
products,vendors, andproduct_reviewstables and updates the Redis cache accordingly. -
Connection Retries - Monitor an orders database with automatic reconnection on failures. This example configures exponential backoff retry logic so the service recovers gracefully from temporary database downtime.
-
Heartbeat with Liveness - Monitor a financial transactions table with heartbeat and liveness checking enabled. The heartbeat keeps the database connection alive during idle periods, and the liveness check ensures the CDC listener remains active.
-
Kafka Offset and Schema Storage - Synchronize inventory changes using Apache Kafka for both offset storage and schema history. This enables reliable distributed state management where the service can resume processing after a restart without missing or duplicating events.
-
S3 Schema History - Capture audit log changes with Amazon S3-backed schema history storage. This cloud-native approach stores database schema evolution in S3, removing the need for self-hosted schema history infrastructure.
Functions
externAttach
Attach point to call the native CDC listener attach method.
Return Type
- Error? - an error if the service cannot be attached, or
()if successful
externDetach
Attach point to call the native CDC listener detach method.
Return Type
- Error? - an error if the service cannot be detached, or
()if successful
externGetAdditionalConfigKeys
function externGetAdditionalConfigKeys(Options options, typedesc<Options> optionsSubType) returns string[]Retrieves additional configuration keys for the given options.
Parameters
- options Options - The options instance to retrieve additional configuration keys from
- optionsSubType typedesc<Options> - The type descriptor of the options
Return Type
- string[] - An array of additional configuration keys
externGracefulStop
Attach point to call the native CDC listener gracefulStop method.
Parameters
- cdcListener Listener - the cdc listener object
Return Type
- Error? - an error if the listener cannot be stopped, or
()if successful
externImmediateStop
Attach point to call the native CDC listener immediateStop method.
Parameters
- cdcListener Listener - the cdc listener object
Return Type
- Error? - an error if the listener cannot be stopped, or
()if successful
externStart
Attach point to call the native CDC listener start method.
Parameters
- cdcListener Listener - the cdc listener object
- config map<anydata> - the configuration map containing debezium properties
Return Type
- Error? - an error if the listener cannot be started, or
()if successful UseexternStartWithExtendedConfigsinstead, which separates debezium properties from listener-specific properties into distinct maps.
Deprecated
Use externStartWithExtendedConfigs instead.
Deprecated
Use externStartWithExtendedConfigs instead.
externStartWithExtendedConfigs
function externStartWithExtendedConfigs(Listener cdcListener, map<anydata> debeziumConfigs, map<anydata> listenerConfigs) returns Error?Attach point to call the native CDC listener start method with separate config maps.
Parameters
- cdcListener Listener - the cdc listener object
- debeziumConfigs map<anydata> - the configuration map containing debezium properties
- listenerConfigs map<anydata> - the configuration map containing listener-specific properties
Return Type
- Error? - an error if the listener cannot be started, or
()if successful
isLive
Checks whether the given CDC listener is live.
Parameters
- cdc Listener - The CDC listener instance to be checked
populateAdditionalConfigurations
function populateAdditionalConfigurations(Options options, map<string> configMap, typedesc<Options> optionsSubType)Populates additional configuration properties from options record. Supports primitive types (string, int, boolean, decimal, float) which are converted to strings. Complex types (arrays, records, maps) are rejected and logged as errors.
populateAzureBlobSchemaHistoryConfiguration
function populateAzureBlobSchemaHistoryConfiguration(AzureBlobInternalSchemaStorage storage, map<string> configMap)Populates Azure Blob schema history configuration properties.
Parameters
- storage AzureBlobInternalSchemaStorage - Azure Blob schema history configuration
populateColumnTransformConfiguration
function populateColumnTransformConfiguration(ColumnTransformConfiguration config, map<string> configMap)Populates column transformation configuration properties.
Parameters
- config ColumnTransformConfiguration - column transformation configuration
populateDatabaseConfigurations
function populateDatabaseConfigurations(DatabaseConnection connection, map<string> configMap)Populates the database configurations in the given map.
Parameters
- connection DatabaseConnection - database connection configuration
populateDataTypeConfiguration
function populateDataTypeConfiguration(DataTypeConfiguration config, map<string> configMap)Populates data type configuration properties.
Parameters
- config DataTypeConfiguration - data type configuration
populateDebeziumProperties
function populateDebeziumProperties(ListenerConfiguration config, map<string> configMap)Processes the given configuration and populates the map with the necessary debezium properties.
Parameters
- config ListenerConfiguration - listener configuration
populateErrorHandlingConfiguration
function populateErrorHandlingConfiguration(ConnectionRetryConfiguration config, map<string> configMap)Populates error handling configuration properties.
Parameters
- config ConnectionRetryConfiguration - error handling configuration
populateExtendedSnapshotConfiguration
function populateExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map<string> configMap)Populates extended snapshot configuration properties.
Parameters
- config ExtendedSnapshotConfiguration - extended snapshot configuration
populateHeartbeatConfiguration
function populateHeartbeatConfiguration(HeartbeatConfiguration config, map<string> configMap)Populates heartbeat configuration properties.
Parameters
- config HeartbeatConfiguration - heartbeat configuration
populateIncrementalSnapshotConfiguration
function populateIncrementalSnapshotConfiguration(IncrementalSnapshotConfiguration config, map<string> configMap)Populates incremental snapshot configuration properties.
Parameters
- config IncrementalSnapshotConfiguration - incremental snapshot configuration
populateJdbcOffsetStorageConfiguration
function populateJdbcOffsetStorageConfiguration(JdbcOffsetStorage storage, map<string> configMap)Populates JDBC offset storage configuration properties.
Parameters
- storage JdbcOffsetStorage - JDBC offset storage configuration
populateJdbcSchemaHistoryConfiguration
function populateJdbcSchemaHistoryConfiguration(JdbcInternalSchemaStorage storage, map<string> configMap)Populates JDBC schema history configuration properties.
Parameters
- storage JdbcInternalSchemaStorage - JDBC schema history configuration
populateListenerProperties
function populateListenerProperties(ListenerConfiguration config, map<anydata> listenerConfigMap)Processes the given configuration and populates the map with the necessary listener-specific properties. These properties are not passed to the Debezium engine, but are used by the listener implementation for its internal functioning.
Parameters
- config ListenerConfiguration - listener configuration
- listenerConfigMap map<anydata> - map to populate with listener-specific properties
populateMessageKeyColumnsConfiguration
function populateMessageKeyColumnsConfiguration(MessageKeyColumns[]? messageKeyColumns, map<string> configMap)Populates message.key.columns configuration (relational databases only). This property specifies the columns to use for the message key in change events.
Parameters
- messageKeyColumns MessageKeyColumns[]? - Composite message key columns specification
populateOptions
populatePerformanceConfiguration
function populatePerformanceConfiguration(PerformanceConfiguration config, map<string> configMap)Populates performance configuration properties.
Parameters
- config PerformanceConfiguration - performance configuration
populateRedisOffsetStorageConfiguration
function populateRedisOffsetStorageConfiguration(RedisOffsetStorage storage, map<string> configMap)Populates Redis offset storage configuration properties.
Parameters
- storage RedisOffsetStorage - Redis offset storage configuration
populateRedisSchemaHistoryConfiguration
function populateRedisSchemaHistoryConfiguration(RedisInternalSchemaStorage storage, map<string> configMap)Populates Redis schema history configuration properties.
Parameters
- storage RedisInternalSchemaStorage - Redis schema history configuration
populateRelationalExtendedSnapshotConfiguration
function populateRelationalExtendedSnapshotConfiguration(RelationalExtendedSnapshotConfiguration config, map<string> configMap)Populates relational database extended snapshot configuration properties.
Parameters
- config RelationalExtendedSnapshotConfiguration - relational extended snapshot configuration
populateRelationalHeartbeatConfiguration
function populateRelationalHeartbeatConfiguration(RelationalHeartbeatConfiguration config, map<string> configMap)Populates relational heartbeat configuration properties (includes SQL action query).
Parameters
- config RelationalHeartbeatConfiguration - relational heartbeat configuration
populateRocketMQSchemaHistoryConfiguration
function populateRocketMQSchemaHistoryConfiguration(RocketMQInternalSchemaStorage storage, map<string> configMap)Populates RocketMQ schema history configuration properties.
Parameters
- storage RocketMQInternalSchemaStorage - RocketMQ schema history configuration
populateS3SchemaHistoryConfiguration
function populateS3SchemaHistoryConfiguration(AmazonS3InternalSchemaStorage storage, map<string> configMap)Populates Amazon S3 schema history configuration properties.
Parameters
- storage AmazonS3InternalSchemaStorage - S3 schema history configuration
populateSignalConfiguration
function populateSignalConfiguration(SignalConfiguration config, map<string> configMap)Populates signal configuration properties.
Parameters
- config SignalConfiguration - signal configuration
populateTableAndColumnConfigurations
function populateTableAndColumnConfigurations(string|string[]? includedTables, string|string[]? excludedTables, string|string[]? includedColumns, string|string[]? excludedColumns, map<string> configMap)Populates table and column filtering configurations (relational databases only).
Parameters
populateTopicConfiguration
function populateTopicConfiguration(TopicConfiguration config, map<string> configMap)Populates topic configuration properties.
Parameters
- config TopicConfiguration - topic configuration
populateTransactionMetadataConfiguration
function populateTransactionMetadataConfiguration(TransactionMetadataConfiguration config, map<string> configMap)Populates transaction metadata configuration properties.
Parameters
- config TransactionMetadataConfiguration - transaction metadata configuration
Service types
cdc: Service
Represents a CDC service in Ballerina.
A CDC service is a distinct service object that defines remote methods to handle
database change events such as onRead, onCreate, onUpdate, onDelete, and onError.
These methods are invoked by the CDC listener when corresponding events occur in the database.
Example:
service cdc:Service on new mysql:CdcListener() { remote function onRead(record{} after) returns error? { // Handle the create event } remote function onCreate(record{} after) returns error? { // Handle the create event } remote function onUpdate(record{} before, record{} after) returns error? { // Handle the update event } remote function onDelete(record{} before) returns error? { // Handle the update event } }
Enums
cdc: BinaryHandlingMode
Represents binary data handling modes.
Members
cdc: DecimalHandlingMode
Represents the modes for handling decimal values from the database.
Members
cdc: EventProcessingFailureHandlingMode
Defines the modes for handling event processing failures.
Members
cdc: HashMaskVersion
Hash masking version
Members
cdc: IncrementalSnapshotWatermarkingStrategy
Represents incremental snapshot watermarking strategies.
Members
cdc: KafkaAuthenticationMechanism
Represents the authentication mechanisms for Kafka connections.
Members
cdc: KafkaSecureSocketProtocol
Protocol options for secure Kafka connections, allowing specification of SSL/TLS protocol versions.
Members
cdc: KafkaSecurityProtocol
The security protocols for Kafka connections.
Members
cdc: Operation
Represents the types of database operations.
Members
cdc: SnapshotIsolationMode
Represents snapshot isolation modes for transactions.
Members
cdc: SnapshotLockingMode
Represents snapshot locking modes (used by MySQL, PostgreSQL, SQL Server).
Members
cdc: SnapshotMode
Represents the snapshot modes for capturing database states.
Members
cdc: SnapshotQueryMode
Represents snapshot query modes.
Members
cdc: SslMode
Represents the SSL modes for secure database connections.
Members
cdc: TimePrecisionMode
Represents time precision modes.
Members
Annotations
cdc: ServiceConfig
The annotation to configure a CDC service.
Records
cdc: AmazonS3InternalSchemaStorage
Amazon S3-based schema history storage configuration.
Using this storage backend requires importing the ballerinax/cdc.storage.aws.s3.driver module.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.s3.history.S3SchemaHistory") - Fully-qualified class name of the S3 schema history implementation
- accessKeyId? string - AWS access key ID for authentication
- secretAccessKey? string - AWS secret access key for authentication
- region? string - AWS region of the S3 bucket
- bucketName string - S3 bucket name for schema history
- objectName string - S3 object (file) name within the bucket
- endpoint? string - Custom S3-compatible endpoint URL; uses the AWS endpoint if not set
cdc: AzureBlobInternalSchemaStorage
Azure Blob Storage-based schema history storage configuration.
Using this storage backend requires importing the ballerinax/cdc.storage.azure.blob.driver module.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory") - Fully-qualified class name of the Azure Blob schema history implementation
- connectionString string - Azure Storage connection string
- accountName? string - Azure Storage account name
- containerName string - Azure Blob container name for schema history
- blobName? string - Blob (file) name within the container
cdc: CdcServiceConfig
Provides a set of configurations for the CDC service.
Fields
cdc: ColumnCharMask
Character-based column masking configuration.
Fields
- length int - Number of mask characters replacing the original value. Use 0 to replace with an empty string.
cdc: ColumnHashMask
Hash-based column masking configuration for irreversibly hashing sensitive column values.
Fields
- algorithm string - Hash algorithm (e.g., SHA-256, MD5)
- salt string - Salt added to the hash for extra security
- version HashMaskVersion(default HASH_V2) - Version of the hashing algorithm to use (affects how the hash is computed and formatted)
cdc: ColumnTransformConfiguration
Column masking and transformation configuration.
Fields
- maskWithHash? ColumnHashMask[] - Hash-based masking for irreversible column value hashing
- maskWithChars? ColumnCharMask[] - Character-based masking with a fixed-length replacement string
- truncateToChars? ColumnTruncate[] - Truncation to a fixed string length
cdc: ColumnTruncate
Column truncation configuration.
Fields
- length int - Maximum character length after truncation
cdc: ConnectionRetryConfiguration
Error handling configuration for connector failure and recovery behavior.
Retry delays increase exponentially (×2 per attempt) from retryInitialDelay up to retryMaxDelay.
Fields
- maxAttempts int(default -1) - Maximum retry attempts for retriable errors (-1 = unlimited, 0 = disabled). When the limit is reached, the connector stops with a fatal error.
- retryInitialDelay decimal(default 0.3) - Wait time in seconds before restarting after a retriable error
- retryMaxDelay decimal(default 10.0) - Maximum wait time in seconds before restarting after a retriable error
cdc: DatabaseConnection
Base database connection configuration for all CDC connectors.
Fields
- connectorClass string - Fully-qualified class name of the Debezium connector
- hostname string - Database server hostname or IP address
- port int - Database server port number
- username string - Database username for authentication
- password string - Database password for authentication
- connectTimeout? decimal - Connection timeout in seconds
- tasksMax int(default 1) - Maximum number of connector tasks
- secure? SecureDatabaseConnection - SSL/TLS connection configuration
cdc: DataTypeConfiguration
Data type handling configuration.
Fields
- binaryHandlingMode BinaryHandlingMode(default BYTES) - Encoding mode for binary column data (bytes, base64, hex)
- timePrecisionMode TimePrecisionMode(default ADAPTIVE) - Representation mode for temporal values
cdc: EventProcessingErrorDetail
Represents the details of an error that occurred during event processing.
Fields
- payload json - The JSON payload associated with the error
cdc: ExtendedSnapshotConfiguration
Extended snapshot configuration for fine-tuning snapshot behavior.
Fields
- delay? decimal - Delay in seconds before starting the snapshot
- fetchSize? int - Number of rows fetched per database round trip during snapshot
- maxThreads int(default 1) - Maximum threads for parallel snapshot operations
- incrementalConfig? IncrementalSnapshotConfiguration - Incremental snapshot configuration
cdc: FileInternalSchemaStorage
File-based schema history storage configuration.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.file.history.FileSchemaHistory") - Fully-qualified class name of the file schema history implementation
- fileName string(default "tmp/dbhistory.dat") - Path to the schema history file
cdc: FileOffsetStorage
File-based offset storage configuration.
Fields
- flushInterval decimal - Interval in seconds between offset flushes
- flushTimeout decimal - Timeout in seconds for an offset flush operation
- className string(default "org.apache.kafka.connect.storage.FileOffsetBackingStore") - Fully-qualified class name of the file offset storage implementation
- fileName string(default "tmp/debezium-offsets.dat") - Path to the offset storage file
cdc: FileSignalConfiguration
File-based signal channel configuration.
Fields
- fileName string(default "file-signals.txt") - Path to the signal file monitored for changes
cdc: HeartbeatConfiguration
Heartbeat configuration for detecting idle or stale connections.
Fields
- interval decimal(default 10.0) - Interval in seconds between heartbeats
cdc: IncrementalSnapshotConfiguration
Incremental (non-blocking) snapshot configuration.
Fields
- chunkSize int(default 1024) - Number of rows per snapshot chunk
- watermarkingStrategy IncrementalSnapshotWatermarkingStrategy(default INSERT_INSERT) - Strategy for marking chunk boundaries
- allowSchemaChanges boolean(default false) - Whether DDL changes are allowed during incremental snapshot
cdc: JdbcInternalSchemaStorage
JDBC-based schema history storage configuration.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.jdbc.history.JdbcSchemaHistory") - Fully-qualified class name of the JDBC schema history implementation
- url string - Full JDBC connection URL (e.g.,
jdbc:mysql://localhost:3306/dbname)
- username? string - Database username
- password? string - Database password
- retryConfig JdbcRetryConfiguration(default {}) - Retry configuration for JDBC connection attempts
- tableName string(default "debezium_database_history") - Schema history table name
- tableDdl? string - DDL for creating the schema history table
- tableSelect? string - SELECT query for reading schema history
- tableInsert? string - INSERT query for writing schema history entries
- tableDelete? string - DELETE query for removing schema history entries
cdc: JdbcOffsetStorage
JDBC-based offset storage configuration.
Fields
- flushInterval decimal - Interval in seconds between offset flushes
- flushTimeout decimal - Timeout in seconds for an offset flush operation
- className string(default "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore") - Fully-qualified class name of the JDBC offset storage implementation
- url string - Full JDBC connection URL (e.g.,
jdbc:mysql://localhost:3306/dbname)
- username? string - Database username
- password? string - Database password
- retryConfig JdbcRetryConfiguration(default {}) - Retry configuration for JDBC connection attempts
- tableName string(default "debezium_offset_storage") - Offset storage table name
- tableDdl? string - DDL for creating the offset storage table
- tableSelect? string - SELECT query for reading offsets
- tableInsert? string - INSERT query for creating new offsets
- tableDelete? string - DELETE query for removing offsets
cdc: JdbcRetryConfiguration
Retry configuration for JDBC connections.
Fields
- retryDelay decimal(default 3.0) - Delay in seconds between connection retry attempts
- maxAttempts int(default 5) - Maximum number of retry attempts
cdc: JmxSignalConfiguration
JMX-based signal channel configuration.
cdc: KafkaAuthenticationConfiguration
Configurations related to Kafka authentication mechanisms.
Fields
- mechanism KafkaAuthenticationMechanism(default AUTH_SASL_PLAIN) - Type of the authentication mechanism. Currently
SASL_PLAIN,SASL_SCRAM_256&SASL_SCRAM_512is supported
- username string - The username to authenticate the Kafka producer/consumer
- password string - The password to authenticate the Kafka producer/consumer
cdc: KafkaInternalSchemaStorage
Kafka-based schema history storage configuration.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.kafka.history.KafkaSchemaHistory") - Fully-qualified class name of the Kafka schema history implementation
- topicName string(default "bal_cdc_internal_schema_history") - Kafka topic for storing schema history
- recoveryPollInterval decimal(default 0.1) - Interval in seconds between recovery polls
- recoveryAttempts int(default 100) - Maximum poll attempts during schema history recovery
- queryTimeout decimal(default 0.003) - Timeout in seconds for Kafka queries
- createTimeout decimal(default 0.03) - Timeout in seconds for topic creation
- securityProtocol KafkaSecurityProtocol(default PROTOCOL_PLAINTEXT) - Kafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
- auth? KafkaAuthenticationConfiguration - SASL authentication credentials; required for SASL_PLAINTEXT and SASL_SSL
- secureSocket? KafkaSecureSocket - SSL/TLS configuration with truststore and optional keystore for mTLS
cdc: KafkaOffsetStorage
Kafka-based offset storage configuration.
Fields
- flushInterval decimal - Interval in seconds between offset flushes
- flushTimeout decimal - Timeout in seconds for an offset flush operation
- className string(default "org.apache.kafka.connect.storage.KafkaOffsetBackingStore") - Fully-qualified class name of the Kafka offset storage implementation
- topicName string(default "bal_cdc_offsets") - Kafka topic for storing offsets
- partitions int(default 1) - Number of partitions for the offset topic
- replicationFactor int(default 2) - Replication factor for the offset topic
- securityProtocol KafkaSecurityProtocol(default PROTOCOL_PLAINTEXT) - Kafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
- auth? KafkaAuthenticationConfiguration - SASL authentication credentials; required for SASL_PLAINTEXT and SASL_SSL
- secureSocket? KafkaSecureSocket - SSL/TLS configuration with truststore and optional keystore for mTLS
cdc: KafkaSecureSocket
Configurations for secure communication with the Kafka server.
Fields
- cert TrustStore|string - Configurations associated with crypto:TrustStore or single certificate file that the client trusts
- key? record {| keyStore KeyStore, keyPassword string |}|KafkaSecureSocketCertKey - Configurations associated with crypto:KeyStore or combination of certificate and private key of the client
- protocol? record {| name KafkaSecureSocketProtocol, versions string[] |} - SSL/TLS protocol related options
- ciphers? string[] - List of ciphers to be used. By default, all the available cipher suites are supported
- provider? string - Name of the security provider used for SSL connections. The default value is the default security provider of the JVM
cdc: KafkaSecureSocketCertKey
A combination of certificate, private key, and private key password if encrypted.
Fields
- certFile string - A file containing the certificate
- keyFile string - A file containing the private key in PKCS8 format
- keyPassword? string - Password of the private key if it is encrypted
cdc: KafkaSignalConfiguration
Kafka-based signal configuration.
Fields
- topicName string(default "bal_cdc_signals") - Kafka topic for signal messages
- groupId string(default "kafka-signal") - Consumer group ID for reading signal messages
- securityProtocol KafkaSecurityProtocol(default PROTOCOL_PLAINTEXT) - Kafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
- auth? KafkaAuthenticationConfiguration - SASL authentication configuration for the signal consumer
- secureSocket? KafkaSecureSocket - SSL/TLS configuration for the signal consumer
- pollTimeout decimal(default 0.1) - Timeout in seconds for polling signal messages from Kafka
cdc: ListenerConfiguration
Base CDC listener configuration.
Fields
- engineName string(default "ballerina-cdc-connector") - Debezium engine instance name
- internalSchemaStorage InternalSchemaStorage(default {fileName: "tmp/dbhistory.dat"}) - Schema history storage configuration
- offsetStorage OffsetStorage(default {fileName: "tmp/debezium-offsets.dat"}) - Offset storage configuration
- livenessInterval decimal(default 60.0) - Interval in seconds for checking CDC listener liveness
- database DatabaseConnection - Database connection configuration (provided by DB-specific listener configs)
- options Options(default {}) - Connector options
cdc: MemoryInternalSchemaStorage
In-memory schema history storage configuration (data is lost on restart).
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.relational.history.MemorySchemaHistory") - Fully-qualified class name of the memory schema history implementation
cdc: MemoryOffsetStorage
In-memory offset storage configuration (data is lost on restart).
Fields
- flushInterval decimal - Interval in seconds between offset flushes
- flushTimeout decimal - Timeout in seconds for an offset flush operation
- className string(default "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - Fully-qualified class name of the memory offset storage implementation
cdc: MessageKeyColumns
A list of expressions that specify the columns to be included in the message key for change events. Each entry specifies a table and the columns from that table to include in the message key.
Fields
- tableName string - The fully qualified name of the table (including database name if necessary) for which to specify message key columns
- columns string[] - A list of column names from the specified table to include in the message key
cdc: Options
Common CDC options applicable to all database connectors.
Fields
- snapshotMode SnapshotMode(default INITIAL) - Initial snapshot behavior (initial, always, no_data, etc.)
- eventProcessingFailureHandlingMode EventProcessingFailureHandlingMode(default WARN) - How to handle event processing failures
- skippedOperations Operation[](default [TRUNCATE]) - Database operations to skip publishing
- skipMessagesWithoutChange boolean(default false) - Whether to discard events with no data changes
- decimalHandlingMode DecimalHandlingMode(default DOUBLE) - Representation mode for decimal values
- maxQueueSize int(default 8192) - Maximum number of events in the internal queue
- maxBatchSize int(default 2048) - Maximum number of events per processing batch
- queryTimeout decimal(default 60) - Database query timeout in seconds
- heartbeatConfig? HeartbeatConfiguration - Heartbeat configuration for connection liveness
- signalConfig? SignalConfiguration - Signal channel configuration for ad-hoc control
- transactionMetadataConfig? TransactionMetadataConfiguration - Transaction boundary event configuration
- columnTransformConfig? ColumnTransformConfiguration - Column masking and transformation configuration
- topicConfig? TopicConfiguration - Topic naming and routing configuration
- connectionRetryConfig? ConnectionRetryConfiguration - Error handling and retry configuration
- performanceConfig? PerformanceConfiguration - Performance tuning configuration
- extendedSnapshot? ExtendedSnapshotConfiguration - Extended snapshot configuration for fine-tuning snapshot behavior
cdc: PerformanceConfiguration
Performance tuning configuration.
Fields
- maxQueueSizeInBytes? int - Maximum queue size in bytes for memory-based backpressure. If not set, remains unlimited. When both maxQueueSizeInBytes and maxQueueSize (from Options) are set, the lower of the two limits will apply.
- pollInterval decimal(default 0.5) - Interval in seconds between database polls for new events
cdc: RedisInternalSchemaStorage
Redis-based schema history storage configuration.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.redis.history.RedisSchemaHistory") - Fully-qualified class name of the Redis schema history implementation
- key string(default "metadata:debezium:schema_history") - Redis key for storing the schema history
- address string - Redis server address (host:port)
- username? string - Redis username for authentication
- password? string - Redis password for authentication
- dbIndex int(default 0) - Redis database index
- secureSocket? RedisSecureSocket - SSL/TLS configuration; if present, SSL is enabled for the connection
- connectTimeout decimal(default 2.0) - Connection timeout in seconds
- socketTimeout decimal(default 2.0) - Socket read/write timeout in seconds
- retryConfig RedisRetryConfiguration(default {}) - Retry configuration for Redis connection attempts
- waitConfig? RedisWaitConfiguration - Wait configuration for Redis replication acknowledgement; if present, replication wait is enabled
- clusterEnabled boolean(default false) - Whether Redis cluster mode is enabled
cdc: RedisOffsetStorage
Redis-based offset storage configuration.
Fields
- flushInterval decimal - Interval in seconds between offset flushes
- flushTimeout decimal - Timeout in seconds for an offset flush operation
- className string(default "io.debezium.storage.redis.offset.RedisOffsetBackingStore") - Fully-qualified class name of the Redis offset storage implementation
- key string(default "metadata:debezium:offsets") - Redis key for storing offsets
- address string - Redis server address (host:port)
- username? string - Redis username for authentication
- password? string - Redis password for authentication
- dbIndex int(default 0) - Redis database index
- secureSocket? RedisSecureSocket - SSL/TLS configuration; if present, SSL is enabled for the connection
- connectTimeout decimal(default 2.0) - Connection timeout in seconds
- socketTimeout decimal(default 2.0) - Socket read/write timeout in seconds
- retryConfig RedisRetryConfiguration(default {}) - Retry configuration for Redis connection attempts
- waitConfig? RedisWaitConfiguration - Wait configuration for Redis replication acknowledgement; if present, replication wait is enabled
- clusterEnabled boolean(default false) - Whether Redis cluster mode is enabled
cdc: RedisRetryConfiguration
Retry configuration for Redis connections.
Fields
- initialDelay decimal(default 0.3) - Initial delay in seconds before the first retry
- maxDelay decimal(default 10.0) - Maximum delay in seconds between retries
- maxAttempts int(default 10) - Maximum number of retry attempts
cdc: RedisSecureSocket
SSL/TLS configuration for secure Redis connections. The presence of this configuration indicates that SSL is enabled for the Redis connection.
Fields
- cert? TrustStore|string - Truststore configuration or certificate file path for server verification
- key? KeyStore - Keystore configuration for client certificate authentication (mTLS)
- verifyHostName boolean(default false) - Whether to verify the Redis server's hostname against the certificate
cdc: RedisWaitConfiguration
Wait configuration for Redis replication acknowledgement. The presence of this configuration enables the wait; its absence disables it.
Fields
- timeout decimal(default 1.0) - Timeout in seconds for replication wait
- retryDelay? decimal - Delay in seconds between replication wait retries. If not set, the wait will not be retried and will fail immediately on timeout.
cdc: RelationalExtendedSnapshotConfiguration
Extended snapshot configuration for relational databases.
Fields
- Fields Included from *ExtendedSnapshotConfiguration
- lockingMode? SnapshotLockingMode - Table locking strategy during snapshot
- queryMode? SnapshotQueryMode - Query strategy for snapshot execution
cdc: RelationalHeartbeatConfiguration
Heartbeat configuration for relational database connectors. Extends the base heartbeat configuration with an optional SQL action query.
Fields
- Fields Included from *HeartbeatConfiguration
- interval decimal
- anydata...
- actionQuery? string - SQL query executed by Debezium with each heartbeat to keep the replication slot or binlog position active. Applicable only to relational databases (MySQL, PostgreSQL, MSSQL).
cdc: RocketMQInternalSchemaStorage
RocketMQ-based schema history storage configuration.
Using this storage backend requires importing the ballerinax/cdc.storage.rocketmq.driver module.
Fields
- className string - Fully-qualified class name of the schema history implementation
- topicPrefix string - Prefix for topic names used in Kafka-based schema history
- className string(default "io.debezium.storage.rocketmq.history.RocketMqSchemaHistory") - Fully-qualified class name of the RocketMQ schema history implementation
- topicName string - RocketMQ topic for schema history
- nameServerAddress string - RocketMQ name server address
- aclEnabled boolean(default false) - Whether ACL authentication is enabled
- accessKey? string - Access key for ACL authentication
- secretKey? string - Secret key for ACL authentication
- recoveryAttempts? int - Maximum recovery attempts when reading schema history
- recoveryPollInterval decimal(default 0.1) - Interval in seconds between recovery poll attempts
- storeRecordTimeout? decimal - Timeout in seconds for storing schema history records
cdc: SecureDatabaseConnection
SSL/TLS configuration for database connections.
Fields
- sslMode SslMode(default PREFERRED) - SSL mode controlling the connection security level
- keyStore? KeyStore - Client keystore for mutual TLS authentication
- trustStore? TrustStore - Truststore for verifying the server certificate
cdc: SignalConfiguration
All supported signal channel configurations for ad-hoc control of the CDC connector.
Fields
- 'source? SourceSignalConfiguration -
- kafka? KafkaSignalConfiguration - Configuration for a signal channel implemented using Kafka topics
- file? FileSignalConfiguration - Configuration for a signal channel implemented using file monitoring
- jmx? JmxSignalConfiguration - Configuration for a signal channel implemented using JMX
cdc: SourceSignalConfiguration
Source (database table) signal channel configuration.
Fields
- dataCollectionTable string - Fully-qualified table name (e.g., "mydb.debezium_signal")
cdc: TopicConfiguration
Topic naming configuration for change event kafka topics.
Fields
- delimiter string(default ".") - Delimiter between topic name components
cdc: TransactionMetadataConfiguration
Transaction boundary event configuration. The presence of this configuration enables the emission of transaction metadata events at the start and end of transactions.
Fields
- topicName string(default "transaction") - Topic name suffix for transaction metadata events (full topic:
<prefix>.<topicName>). In embedded mode, this is essentially a logical identifier.
Errors
cdc: Error
Defines the common error type for the CDC module.
cdc: EventProcessingError
Represents an error that occurred during event processing.
cdc: OperationNotPermittedError
Represents an error that occurred due to an operation not being permitted.
cdc: PayloadBindingError
Represents an error that occurred due to payload binding issues.
Object types
cdc: Listener
Represents a Ballerina CDC MySQL Listener.
attach
Attaches a CDC service to the listener.
Parameters
- s Service - The CDC service to attach
Return Type
- Error? - An error if the service cannot be attached, or
()if successful
'start
function 'start() returns Error?Starts the CDC listener.
Return Type
- Error? - An error if the listener cannot be started, or
()if successful
detach
Detaches a CDC service from the listener.
Parameters
- s Service - The CDC service to detach
Return Type
- Error? - An error if the service cannot be detached, or
()if successful
gracefulStop
function gracefulStop() returns Error?Stops the listener gracefully.
Return Type
- Error? - An error if the listener cannot be stopped, or
()if successful
immediateStop
function immediateStop() returns Error?Stops the listener immediately.
Return Type
- Error? - An error if the listener cannot be stopped, or
()if successful
Union types
cdc: InternalSchemaStorage
InternalSchemaStorage
Union type representing all supported internal schema history storage configurations.
cdc: OffsetStorage
OffsetStorage
Union type representing all supported offset storage configurations.
Import
import ballerinax/cdc;Metadata
Released date: 2 days ago
Version: 1.3.0
License: Apache-2.0
Compatibility
Platform: java21
Ballerina version: 2201.12.0
GraalVM compatible: Yes
Pull count
Total: 22774
Current verison: 135
Weekly downloads
Keywords
Type/Connector
Type/Library
Area/Database
Vendor/Red Hat
Debezium
Contributors