kafka
Module kafka
Declarations
Definitions

ballerinax/kafka Ballerina library
Overview
This package provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
This package supports Kafka 1.x.x, 2.x.x and 3.x.x versions.
Consumer and producer
Kafka producer
A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer. For the producer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a producer with the basic configuration.
import ballerinax/kafka; kafka:ProducerConfiguration producerConfiguration = { clientId: "basic-producer", acks: "all", retryCount: 3 }; kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);
Kafka consumer
A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer. For the consumer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a consumer with the basic configuration.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", // Unique string that identifies the consumer offsetReset: "earliest", // Offset reset strategy if no initial offset topics: ["kafka-topic"] }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
Listener
The Kafka consumer can be used as a listener to a set of topics without the need to manually poll
the messages.
You can use the Caller
to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", topics: ["kafka-topic-1"], pollingInterval: 1, autoCommit: false }; listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { // processes the records ... // commits the offsets manually kafka:Error? commitResult = caller->commit(); if commitResult is kafka:Error { log:printError("Error occurred while committing the offsets for the consumer ", 'error = commitResult); } } }
Data serialization
Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serialization in which bytes of arrays are converted into the desired data type.
Currently, this package only supports the byte array
data type for both the keys and values. The following code snippets
show how to produce and read a message from Kafka.
string message = "Hello World, Ballerina"; string key = "my-key"; // converts the message and key to a byte array check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; // tries to generate the string value from the byte array string result = check string:fromBytes(messageContent); io:println("The result is : ", result); }
Concurrency
In Kafka, records are grouped into smaller units called partitions. These can be processed independently without compromising the correctness of the results and lays the foundation for parallel processing. This can be achieved by using multiple consumers within the same group each reading and processing data from a subset of topic partitions and running in a single thread.
Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.
The following code snippet joins a consumer to the consumer-group
and assigns it to a topic partition manually.
kafka:ConsumerConfiguration consumerConfiguration = { // `groupId` determines the consumer group groupId: "consumer-group", pollingInterval: 1, autoCommit: false }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration); // creates a topic partition kafka:TopicPartition topicPartition = { topic: "kafka-topic-1", partition: 1 }; // passes the topic partitions to the assign function as an array check kafkaConsumer->assign([topicPartition]);
Report issues
To report bugs, request new features, start new discussions, view project boards, etc., go to the Ballerina standard library parent repository.
Useful links
- Chat live with us via our Discord server.
- Post all technical questions on Stack Overflow with the #ballerina tag.
Clients
kafka: Caller
Represents a Kafka caller, which can be used to commit the offsets consumed by the service.
'commit
function 'commit() returns Error?
Commits the currently consumed offsets of the service.
kafka:Error? result = caller->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets and partitions for the given topics of the service.
kafka:Error? result = caller->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seek
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
kafka: Consumer
Represents a Kafka consumer endpoint.
Constructor
Creates a new kafka:Consumer
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
assign
function assign(TopicPartition[] partitions) returns Error?
Assigns consumer to a set of topic partitions.
kafka:Error? result = consumer->assign([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to be assigned
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
close
Closes the consumer connection with the external Kafka broker.
kafka:Error? result = consumer->close();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the close operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
'commit
function 'commit() returns Error?
Commits the currently consumed offsets of the consumer.
kafka:Error? result = consumer->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets of the specific topic partitions for the consumer.
kafka:Error? result = consumer->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
getAssignment
function getAssignment() returns TopicPartition[]|Error
Retrieves the currently-assigned partitions of the consumer.
kafka:TopicPartition[] result = check consumer->getAssignment();
Return Type
- TopicPartition[]|Error - Array of assigned partitions for the consumer if executes successfully or else a
kafka:Error
getAvailableTopics
Retrieves the available list of topics for a particular consumer.
string[] result = check consumer->getAvailableTopics();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the execution of the
getAvailableTopics
operation
Return Type
getBeginningOffsets
function getBeginningOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the start offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getBeginningOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Array of topic partitions to get the starting offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getBeginningOffsets
execution
Return Type
- PartitionOffset[]|Error - Starting offsets for the given partitions if executes successfully or else a
kafka:Error
getCommittedOffset
function getCommittedOffset(TopicPartition partition, decimal duration) returns PartitionOffset|Error?
Retrieves the lastly committed offset for the given topic partition.
kafka:PartitionOffset? result = check consumer->getCommittedOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the committed offset is returned to the consumer
- duration decimal (default -1) - Timeout duration (in seconds) for the
getCommittedOffset
operation to execute
Return Type
- PartitionOffset|Error? - The last committed offset for a given partition for the consumer if there is a committed offset
present,
()
if there are no committed offsets, or else akafka:Error
getEndOffsets
function getEndOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the last offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getEndOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of partitions to get the last offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getEndOffsets
operation to execute
Return Type
- PartitionOffset[]|Error - End offsets for the given partitions if executes successfully or else a
kafka:Error
getPausedPartitions
function getPausedPartitions() returns TopicPartition[]|Error
Retrieves the partitions, which are currently paused.
kafka:TopicPartition[] result = check consumer->getPausedPartitions();
Return Type
- TopicPartition[]|Error - The set of partitions paused from message retrieval if executes successfully or else a
kafka:Error
getPositionOffset
function getPositionOffset(TopicPartition partition, decimal duration) returns int|Error
Retrieves the offset of the next record that will be fetched if a record exists in that position.
int result = check consumer->getPositionOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the position is required
- duration decimal (default -1) - Timeout duration (in seconds) for the get position offset operation to execute
Return Type
getSubscription
Retrieves the set of topics, which are currently subscribed by the consumer.
string[] result = check consumer->getSubscription();
Return Type
getTopicPartitions
function getTopicPartitions(string topic, decimal duration) returns TopicPartition[]|Error
Retrieves the set of partitions to which the topic belongs.
kafka:TopicPartition[] result = check consumer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The topic for which the partition information is needed
- duration decimal (default -1) - Timeout duration (in seconds) for the
getTopicPartitions
operation to execute
Return Type
- TopicPartition[]|Error - Array of partitions for the given topic if executes successfully or else a
kafka:Error
pause
function pause(TopicPartition[] partitions) returns Error?
Pauses retrieving messages from a set of partitions.
kafka:Error? result = consumer->pause([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of topic partitions to pause the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
poll
function poll(decimal timeout, typedesc<AnydataConsumerRecord[]> T) returns T|Error
Polls the external broker to retrieve messages.
kafka:AnydataConsumerRecord[] result = check consumer->poll(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<AnydataConsumerRecord[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of consumer records if executed successfully or else a
kafka:Error
pollPayload
Polls the external broker to retrieve messages in the required data type without the kafka:AnydataConsumerRecord
information.
Person[] persons = check consumer->pollPayload(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<anydata[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of data in the required format if executed successfully or else a
kafka:Error
resume
function resume(TopicPartition[] partitions) returns Error?
Resumes retrieving messages from a set of partitions, which were paused earlier.
kafka:Error? result = consumer->resume([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to resume the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seek
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToBeginning
function seekToBeginning(TopicPartition[] partitions) returns Error?
Seeks to the beginning of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToBeginning([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToEnd
function seekToEnd(TopicPartition[] partitions) returns Error?
Seeks to the end of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToEnd([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
subscribe
Subscribes the consumer to the provided set of topics.
kafka:Error? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
subscribeWithPattern
Subscribes the consumer to the topics, which match the provided pattern.
kafka:Error? result = consumer->subscribeWithPattern("kafka.*");
Parameters
- regex string - The pattern, which should be matched with the topics to be subscribed
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
unsubscribe
function unsubscribe() returns Error?
Unsubscribes from all the topics that the consumer is subscribed to.
kafka:Error? result = consumer->unsubscribe();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
kafka: Producer
Represents a Kafka producer endpoint.
Constructor
Creates a new kafka:Producer
.
init (string|string[] bootstrapServers, *ProducerConfiguration config)
- config *ProducerConfiguration - Configurations related to initializing a
kafka:Producer
close
function close() returns Error?
Closes the producer connection to the external Kafka broker.
kafka:Error? result = producer->close();
Return Type
- Error? - A
kafka:Error
if closing the producer failed or else '()'
'flush
function 'flush() returns Error?
Flushes the batch of records already sent to the broker by the producer.
kafka:Error? result = producer->'flush();
Return Type
- Error? - A
kafka:Error
if records couldn't be flushed or else '()'
getTopicPartitions
function getTopicPartitions(string topic) returns TopicPartition[]|Error
Retrieves the topic partition information for the provided topic.
kafka:TopicPartition[] result = check producer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The specific topic, of which the topic partition information is required
Return Type
- TopicPartition[]|Error - A
kafka:TopicPartition
array for the given topic or else akafka:Error
if the operation fails
send
function send(AnydataProducerRecord producerRecord) returns Error?
Produces records to the Kafka server.
kafka:Error? result = producer->send({value: "Hello World".toBytes(), topic: "kafka-topic"});
Parameters
- producerRecord AnydataProducerRecord - Record to be produced
Return Type
- Error? - A
kafka:Error
if send action fails to send data or else '()'
Service types
kafka: Service
The Kafka service type.
Constants
kafka: ACKS_ALL
Producer acknowledgement type is 'all'. This will guarantee that the record will not be lost as long as at least one in-sync replica is alive.
kafka: ACKS_NONE
Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.
kafka: ACKS_SINGLE
Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its A local log will respond without waiting FOR full acknowledgement from all the followers.
kafka: AUTH_SASL_PLAIN
Kafka SASL_PLAIN authentication mechanism
kafka: AUTH_SASL_SCRAM_SHA_256
Kafka SASL_SCRAM authentication mechanism
kafka: AUTH_SASL_SCRAM_SHA_512
kafka: COMPRESSION_GZIP
Kafka GZIP compression type.
kafka: COMPRESSION_LZ4
Kafka LZ4 compression type.
kafka: COMPRESSION_NONE
No compression.
kafka: COMPRESSION_SNAPPY
Kafka Snappy compression type.
kafka: COMPRESSION_ZSTD
Kafka ZSTD compression type.
kafka: DEFAULT_URL
The default server URL.
kafka: DES_AVRO
Apache Avro deserializer.
kafka: DES_BYTE_ARRAY
In-built Kafka byte array deserializer.
kafka: DES_CUSTOM
User-defined deserializer.
kafka: DES_FLOAT
In-built Kafka float deserializer.
kafka: DES_INT
In-built Kafka int deserializer.
kafka: DES_STRING
In-built Kafka string deserializer.
kafka: ISOLATION_COMMITTED
Configures the consumer to read the committed messages only in the transactional mode when poll() is called.
kafka: ISOLATION_UNCOMMITTED
Configures the consumer to read all the messages including the aborted ones.
kafka: OFFSET_RESET_EARLIEST
Automatically reset the consumer offset to the earliest offset
kafka: OFFSET_RESET_LATEST
Automatically reset the consumer offset to the latest offset
kafka: OFFSET_RESET_NONE
If the offsetReset
is set to OFFSET_RESET_NONE
, the consumer will give an error if no previous offset is found
for the consumer group
kafka: PROTOCOL_PLAINTEXT
Represents Kafka un-authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_PLAINTEXT
Represents Kafka authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_SSL
Represents Kafka SASL authenticated, SSL channel
kafka: PROTOCOL_SSL
Represents Kafka SSL channel
kafka: SER_AVRO
Apache Avro serializer.
kafka: SER_BYTE_ARRAY
In-built Kafka Byte Array serializer.
kafka: SER_CUSTOM
User-defined serializer.
kafka: SER_FLOAT
In-built Kafka float serializer.
kafka: SER_INT
In-built Kafka int serializer.
kafka: SER_STRING
In-built Kafka string serializer.
Enums
kafka: Protocol
Represents protocol options.
Members
Listeners
kafka: Listener
Represents a Kafka consumer endpoint.
Constructor
Creates a new kafka:Listener
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
attach
Attaches a service to the listener.
error? result = listener.attach(kafkaService);
Parameters
- 'service Service - The service to be attached
Return Type
- error? - A
kafka:Error
if an error is encountered while attaching the service or else()
'start
function 'start() returns error?
Starts the registered services.
error? result = listener.'start();
Return Type
- error? - A
kafka:Error
if an error is encountered while starting the server or else()
gracefulStop
function gracefulStop() returns error?
Stops the Kafka listener gracefully.
error? result = listener.gracefulStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
immediateStop
function immediateStop() returns error?
Stops the kafka listener immediately.
error? result = listener.immediateStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
detach
Detaches a consumer service from the listener.
error? result = listener.detach(kafkaService);
Parameters
- 'service Service - The service to be detached
Return Type
- error? - A
kafka:Error
if an error is encountered while detaching a service or else()
Annotations
kafka: Payload
The annotation which is used to define the payload parameter in the onConsumerRecord
service method.
Records
kafka: AnydataConsumerRecord
Type related to anydata consumer record.
Fields
- key? anydata - Key that is included in the record
- value anydata - Anydata record content
- timestamp int - Timestamp of the record, in milliseconds since epoch
- offset PartitionOffset - Topic partition position in which the consumed record is stored
kafka: AnydataProducerRecord
Details related to the anydata producer record.
Fields
- topic string - Topic to which the record will be appended
- key? anydata - Key that is included in the record
- value anydata - Anydata record content
- timestamp? int - Timestamp of the record, in milliseconds since epoch
- partition? int - Partition to which the record should be sent
kafka: AuthenticationConfiguration
Configurations related to Kafka authentication mechanisms.
Fields
- mechanism AuthenticationMechanism(default AUTH_SASL_PLAIN) - Type of the authentication mechanism. Currently
SASL_PLAIN
,SASL_SCRAM_256
&SASL_SCRAM_512
is supported
- username string - The username to authenticate the Kafka producer/consumer
- password string - The password to authenticate the Kafka producer/consumer
kafka: BytesConsumerRecord
Subtype related to kafka:AnydataConsumerRecord
record.
Fields
- Fields Included from *AnydataConsumerRecord
- key anydata
- value anydata
- timestamp int
- offset PartitionOffset
- headers map<byte[]|byte[][]|string|string[]>
- value byte[] - Record content in bytes
- headers map<byte[]|byte[][]> - Headers as a byte[] or byte[][]
kafka: BytesProducerRecord
Subtype related to kafka:AnydataProducerRecord
record.
Fields
- Fields Included from *AnydataProducerRecord
- value byte[] - Record content in bytes
- headers? map<byte[]|byte[][]> - Headers as a byte[] or byte[][]
kafka: CertKey
Represents a combination of certificate, private key, and private key password if encrypted.
Fields
- certFile string - A file containing the certificate
- keyFile string - A file containing the private key in PKCS8 format
- keyPassword? string - Password of the private key if it is encrypted
kafka: ConsumerConfiguration
Configurations related to consumer endpoint.
Fields
- groupId? string - Unique string that identifies the consumer
- offsetReset? OffsetResetMethod - Offset reset strategy if no initial offset
- partitionAssignmentStrategy? string - Strategy class for handling the partition assignment among consumers
- metricsRecordingLevel? string - Metrics recording level
- metricsReporterClasses? string - Metrics reporter classes
- clientId? string - Identifier to be used for server side logging
- interceptorClasses? string - Interceptor classes to be used before sending the records
- isolationLevel? IsolationLevel - Transactional message reading method
- schemaRegistryUrl? string - Avro schema registry URL. Use this field to specify the schema registry URL, if the Avro serializer is used
- sessionTimeout? decimal - Timeout (in seconds) used to detect consumer failures when the heartbeat threshold is reached
- heartBeatInterval? decimal - Expected time (in seconds) between the heartbeats
- metadataMaxAge? decimal - Maximum time (in seconds) to force a refresh of metadata
- autoCommitInterval? decimal - Auto committing interval (in seconds) for commit offset when auto-committing is enabled
- maxPartitionFetchBytes? int - The maximum amount of data the server returns per partition
- sendBuffer? int - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer? int - Size of the TCP receive buffer (SO_RCVBUF)
- fetchMinBytes? int - Minimum amount of data the server should return for a fetch request
- fetchMaxBytes? int - Maximum amount of data the server should return for a fetch request
- fetchMaxWaitTime? decimal - Maximum amount of time (in seconds) the server will block before answering the fetch request
- reconnectBackoffTimeMax? decimal - Maximum amount of time in seconds to wait when reconnecting
- retryBackoff? decimal - Time (in seconds) to wait before attempting to retry a failed request
- metricsSampleWindow? decimal - Window of time (in seconds) a metrics sample is computed over
- metricsNumSamples? int - Number of samples maintained to compute metrics
- requestTimeout? decimal - Wait time (in seconds) for response of a request
- connectionMaxIdleTime? decimal - Close idle connections after the number of seconds
- maxPollRecords? int - Maximum number of records returned in a single call to poll
- maxPollInterval? int - Maximum delay between invocations of poll
- reconnectBackoffTime? decimal - Time (in seconds) to wait before attempting to reconnect
- pollingTimeout? decimal - Timeout interval for polling in seconds
- pollingInterval? decimal - Polling interval for the consumer in seconds
- concurrentConsumers? int - Number of concurrent consumers
- defaultApiTimeout? decimal - Default API timeout value (in seconds) for APIs with duration
- autoCommit boolean(default true) - Enables auto committing offsets
- checkCRCS boolean(default true) - Checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption occurred
to the messages. This may add some overhead and might need to be set to
false
if extreme performance is required
- excludeInternalTopics boolean(default true) - Whether records from internal topics should be exposed to the consumer
- decoupleProcessing boolean(default false) - Decouples processing
- validation boolean(default true) - Configuration related to constraint validation check
- autoSeekOnValidationFailure boolean(default true) - Automatically seeks past the errornous records in the event of an data-binding or validating constraints failure
- secureSocket? SecureSocket - Configurations related to SSL/TLS encryption
- auth? AuthenticationConfiguration - Authentication-related configurations for the
kafka:Consumer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: KafkaPayload
Defines the Payload remote function parameter.
kafka: PartitionOffset
Represents the topic partition position in which the consumed record is stored.
Fields
- partition TopicPartition - The
kafka:TopicPartition
to which the record is related
- offset int - Offset in which the record is stored in the partition
kafka: ProducerConfiguration
Represents the kafka:Producer
configuration.
Fields
- acks ProducerAcks(default ACKS_SINGLE) - Number of acknowledgments
- compressionType CompressionType(default COMPRESSION_NONE) - Compression type to be used for messages
- clientId? string - Identifier to be used for server side logging
- metricsRecordingLevel? string - Metrics recording level
- metricReporterClasses? string - Metrics reporter classes
- partitionerClass? string - Partitioner class to be used to select the partition to which the message is sent
- interceptorClasses? string - Interceptor classes to be used before sending the records
- transactionalId? string - Transactional ID to be used in transactional delivery
- schemaRegistryUrl? string - Avro schema registry URL. Use this field to specify the schema registry URL if the Avro serializer is used
- bufferMemory? int - Total bytes of memory the producer can use to buffer records
- retryCount? int - Number of retries to resend a record
- batchSize? int - Maximum number of bytes to be batched together when sending the records. Records exceeding this limit will not be batched. Setting this to 0 will disable batching
- linger? decimal - Delay (in seconds) to allow other records to be batched before sending them to the Kafka server
- sendBuffer? int - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer? int - Size of the TCP receive buffer (SO_RCVBUF)
- maxRequestSize? int - The maximum size of a request in bytes
- reconnectBackoffTime? decimal - Time (in seconds) to wait before attempting to reconnect
- reconnectBackoffMaxTime? decimal - Maximum amount of time in seconds to wait when reconnecting
- retryBackoffTime? decimal - Time (in seconds) to wait before attempting to retry a failed request
- maxBlock? decimal - Maximum block time (in seconds) during which the sending is blocked when the buffer is full
- requestTimeout? decimal - Wait time (in seconds) for the response of a request
- metadataMaxAge? decimal - Maximum time (in seconds) to force a refresh of metadata
- metricsSampleWindow? decimal - Time (in seconds) window for a metrics sample to compute over
- metricsNumSamples? int - Number of samples maintained to compute the metrics
- maxInFlightRequestsPerConnection? int - Maximum number of unacknowledged requests on a single connection
- connectionsMaxIdleTime? decimal - Close the idle connections after this number of seconds
- transactionTimeout? decimal - Timeout (in seconds) for transaction status update from the producer
- enableIdempotence boolean(default false) - Exactly one copy of each message is written to the stream when enabled
- secureSocket? SecureSocket - Configurations related to SSL/TLS encryption
- auth? AuthenticationConfiguration - Authentication-related configurations for the
kafka:Producer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: SecureSocket
Configurations for secure communication with the Kafka server.
Fields
- cert TrustStore|string - Configurations associated with crypto:TrustStore or single certificate file that the client trusts
- ciphers? string[] - List of ciphers to be used. By default, all the available cipher suites are supported
- provider? string - Name of the security provider used for SSL connections. The default value is the default security provider of the JVM
kafka: TopicPartition
Represents a topic partition.
Fields
- topic string - Topic to which the partition is related
- partition int - Index of the specific partition
Errors
kafka: Error
Defines the common error type for the module.
kafka: PayloadBindingError
Represents an error, which occurred due to payload binding.
kafka: PayloadValidationError
Represents an error, which occurred due to payload constraint validation.
Union types
kafka: OffsetResetMethod
OffsetResetMethod
Represents the different types of offset-reset methods of the Kafka consumer.
kafka: IsolationLevel
IsolationLevel
kafka:Consumer
isolation level type.
kafka: ProducerAcks
ProducerAcks
kafka:Producer
acknowledgement types.
kafka: CompressionType
CompressionType
Kafka compression types to compress the messages.
kafka: AuthenticationMechanism
AuthenticationMechanism
Represents the supported Kafka SASL authentication mechanisms.
kafka: SecurityProtocol
SecurityProtocol
Represents the supported security protocols for Kafka clients.
Simple name reference types
kafka: DeserializerType
DeserializerType
Kafka in-built deserializer type.
kafka: SerializerType
SerializerType
Kafka in-built serializer types.
Import
import ballerinax/kafka;
Metadata
Released date: 12 days ago
Version: 4.3.0
License: Apache-2.0
Compatibility
Platform: java21
Ballerina version: 2201.11.0
GraalVM compatible: Yes
Pull count
Total: 22971
Current verison: 223
Weekly downloads
Keywords
kafka
event streaming
network
messaging
Contributors
Dependencies