kafka
Module kafka
Declarations
Definitions
data:image/s3,"s3://crabby-images/3c0ee/3c0ee00cfbd176ad49b299cf953b3e007e10a44e" alt=""
ballerinax/kafka Ballerina library
Overview
This package provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
This package supports Kafka 1.x.x, 2.x.x and 3.x.x versions.
Consumer and producer
Kafka producer
A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer. For the producer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a producer with the basic configuration.
import ballerinax/kafka; kafka:ProducerConfiguration producerConfiguration = { clientId: "basic-producer", acks: "all", retryCount: 3 }; kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);
Kafka consumer
A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer. For the consumer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a consumer with the basic configuration.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", // Unique string that identifies the consumer offsetReset: "earliest", // Offset reset strategy if no initial offset topics: ["kafka-topic"] }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
Listener
The Kafka consumer can be used as a listener to a set of topics without the need to manually poll
the messages.
You can use the Caller
to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", topics: ["kafka-topic-1"], pollingInterval: 1, autoCommit: false }; listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, kafka:BytesConsumerRecord[] records) { // processes the records ... // commits the offsets manually kafka:Error? commitResult = caller->commit(); if commitResult is kafka:Error { log:printError("Error occurred while committing the offsets for the consumer ", 'error = commitResult); } } }
Data serialization
Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serialization in which bytes of arrays are converted into the desired data type.
Currently, this package only supports the byte array
data type for both the keys and values. The following code snippets
show how to produce and read a message from Kafka.
string message = "Hello World, Ballerina"; string key = "my-key"; // converts the message and key to a byte array check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
kafka:BytesConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; // tries to generate the string value from the byte array string result = check string:fromBytes(messageContent); io:println("The result is : ", result); }
Concurrency
In Kafka, records are grouped into smaller units called partitions. These can be processed independently without compromising the correctness of the results and lays the foundation for parallel processing. This can be achieved by using multiple consumers within the same group each reading and processing data from a subset of topic partitions and running in a single thread.
Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.
The following code snippet joins a consumer to the consumer-group
and assigns it to a topic partition manually.
kafka:ConsumerConfiguration consumerConfiguration = { // `groupId` determines the consumer group groupId: "consumer-group", pollingInterval: 1, autoCommit: false }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration); // creates a topic partition kafka:TopicPartition topicPartition = { topic: "kafka-topic-1", partition: 1 }; // passes the topic partitions to the assign function as an array check kafkaConsumer->assign([topicPartition]);
Report issues
To report bugs, request new features, start new discussions, view project boards, etc., go to the Ballerina standard library parent repository.
Useful links
- Chat live with us via our Discord server.
- Post all technical questions on Stack Overflow with the #ballerina tag.
Clientsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Callerdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a Kafka caller, which can be used to commit the offsets consumed by the service.
'commitdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function 'commit() returns Error?
Commits the currently consumed offsets of the service.
kafka:Error? result = caller->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffsetdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets and partitions for the given topics of the service.
kafka:Error? result = caller->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
kafka: Consumerdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a Kafka consumer endpoint.
Constructordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Creates a new kafka:Consumer
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
assigndata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function assign(TopicPartition[] partitions) returns Error?
Assigns consumer to a set of topic partitions.
kafka:Error? result = consumer->assign([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to be assigned
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
closedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Closes the consumer connection with the external Kafka broker.
kafka:Error? result = consumer->close();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the close operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
'commitdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function 'commit() returns Error?
Commits the currently consumed offsets of the consumer.
kafka:Error? result = consumer->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffsetdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets of the specific topic partitions for the consumer.
kafka:Error? result = consumer->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
getAssignmentdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getAssignment() returns TopicPartition[]|Error
Retrieves the currently-assigned partitions of the consumer.
kafka:TopicPartition[] result = check consumer->getAssignment();
Return Type
- TopicPartition[]|Error - Array of assigned partitions for the consumer if executes successfully or else a
kafka:Error
getAvailableTopicsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Retrieves the available list of topics for a particular consumer.
string[] result = check consumer->getAvailableTopics();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the execution of the
getAvailableTopics
operation
Return Type
getBeginningOffsetsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getBeginningOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the start offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getBeginningOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Array of topic partitions to get the starting offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getBeginningOffsets
execution
Return Type
- PartitionOffset[]|Error - Starting offsets for the given partitions if executes successfully or else a
kafka:Error
getCommittedOffsetdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getCommittedOffset(TopicPartition partition, decimal duration) returns PartitionOffset|Error?
Retrieves the lastly committed offset for the given topic partition.
kafka:PartitionOffset? result = check consumer->getCommittedOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the committed offset is returned to the consumer
- duration decimal (default -1) - Timeout duration (in seconds) for the
getCommittedOffset
operation to execute
Return Type
- PartitionOffset|Error? - The last committed offset for a given partition for the consumer if there is a committed offset
present,
()
if there are no committed offsets, or else akafka:Error
getEndOffsetsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getEndOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the last offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getEndOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of partitions to get the last offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getEndOffsets
operation to execute
Return Type
- PartitionOffset[]|Error - End offsets for the given partitions if executes successfully or else a
kafka:Error
getPausedPartitionsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getPausedPartitions() returns TopicPartition[]|Error
Retrieves the partitions, which are currently paused.
kafka:TopicPartition[] result = check consumer->getPausedPartitions();
Return Type
- TopicPartition[]|Error - The set of partitions paused from message retrieval if executes successfully or else a
kafka:Error
getPositionOffsetdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getPositionOffset(TopicPartition partition, decimal duration) returns int|Error
Retrieves the offset of the next record that will be fetched if a record exists in that position.
int result = check consumer->getPositionOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the position is required
- duration decimal (default -1) - Timeout duration (in seconds) for the get position offset operation to execute
Return Type
getSubscriptiondata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Retrieves the set of topics, which are currently subscribed by the consumer.
string[] result = check consumer->getSubscription();
Return Type
getTopicPartitionsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getTopicPartitions(string topic, decimal duration) returns TopicPartition[]|Error
Retrieves the set of partitions to which the topic belongs.
kafka:TopicPartition[] result = check consumer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The topic for which the partition information is needed
- duration decimal (default -1) - Timeout duration (in seconds) for the
getTopicPartitions
operation to execute
Return Type
- TopicPartition[]|Error - Array of partitions for the given topic if executes successfully or else a
kafka:Error
pausedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function pause(TopicPartition[] partitions) returns Error?
Pauses retrieving messages from a set of partitions.
kafka:Error? result = consumer->pause([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of topic partitions to pause the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
polldata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function poll(decimal timeout, typedesc<AnydataConsumerRecord[]> T) returns T|Error
Polls the external broker to retrieve messages.
kafka:AnydataConsumerRecord[] result = check consumer->poll(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<AnydataConsumerRecord[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of consumer records if executed successfully or else a
kafka:Error
pollPayloaddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Polls the external broker to retrieve messages in the required data type without the kafka:AnydataConsumerRecord
information.
Person[] persons = check consumer->pollPayload(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<anydata[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of data in the required format if executed successfully or else a
kafka:Error
resumedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function resume(TopicPartition[] partitions) returns Error?
Resumes retrieving messages from a set of partitions, which were paused earlier.
kafka:Error? result = consumer->resume([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to resume the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToBeginningdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function seekToBeginning(TopicPartition[] partitions) returns Error?
Seeks to the beginning of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToBeginning([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToEnddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function seekToEnd(TopicPartition[] partitions) returns Error?
Seeks to the end of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToEnd([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
subscribedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Subscribes the consumer to the provided set of topics.
kafka:Error? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
subscribeWithPatterndata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Subscribes the consumer to the topics, which match the provided pattern.
kafka:Error? result = consumer->subscribeWithPattern("kafka.*");
Parameters
- regex string - The pattern, which should be matched with the topics to be subscribed
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
unsubscribedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function unsubscribe() returns Error?
Unsubscribes from all the topics that the consumer is subscribed to.
kafka:Error? result = consumer->unsubscribe();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
kafka: Producerdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a Kafka producer endpoint.
Constructordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Creates a new kafka:Producer
.
init (string|string[] bootstrapServers, *ProducerConfiguration config)
- config *ProducerConfiguration - Configurations related to initializing a
kafka:Producer
closedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function close() returns Error?
Closes the producer connection to the external Kafka broker.
kafka:Error? result = producer->close();
Return Type
- Error? - A
kafka:Error
if closing the producer failed or else '()'
'flushdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function 'flush() returns Error?
Flushes the batch of records already sent to the broker by the producer.
kafka:Error? result = producer->'flush();
Return Type
- Error? - A
kafka:Error
if records couldn't be flushed or else '()'
getTopicPartitionsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function getTopicPartitions(string topic) returns TopicPartition[]|Error
Retrieves the topic partition information for the provided topic.
kafka:TopicPartition[] result = check producer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The specific topic, of which the topic partition information is required
Return Type
- TopicPartition[]|Error - A
kafka:TopicPartition
array for the given topic or else akafka:Error
if the operation fails
senddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function send(AnydataProducerRecord producerRecord) returns Error?
Produces records to the Kafka server.
kafka:Error? result = producer->send({value: "Hello World".toBytes(), topic: "kafka-topic"});
Parameters
- producerRecord AnydataProducerRecord - Record to be produced
Return Type
- Error? - A
kafka:Error
if send action fails to send data or else '()'
Service typesdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Servicedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
The Kafka service type.
Constantsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: ACKS_ALLdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Producer acknowledgement type is 'all'. This will guarantee that the record will not be lost as long as at least one in-sync replica is alive.
kafka: ACKS_NONEdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.
kafka: ACKS_SINGLEdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its A local log will respond without waiting FOR full acknowledgement from all the followers.
kafka: AUTH_SASL_PLAINdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka SASL_PLAIN authentication mechanism
kafka: AUTH_SASL_SCRAM_SHA_256data:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka SASL_SCRAM authentication mechanism
kafka: AUTH_SASL_SCRAM_SHA_512data:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: COMPRESSION_GZIPdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka GZIP compression type.
kafka: COMPRESSION_LZ4data:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka LZ4 compression type.
kafka: COMPRESSION_NONEdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
No compression.
kafka: COMPRESSION_SNAPPYdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka Snappy compression type.
kafka: COMPRESSION_ZSTDdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Kafka ZSTD compression type.
kafka: DEFAULT_URLdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
The default server URL.
kafka: DES_AVROdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Apache Avro deserializer.
kafka: DES_BYTE_ARRAYdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka byte array deserializer.
kafka: DES_CUSTOMdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
User-defined deserializer.
kafka: DES_FLOATdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka float deserializer.
kafka: DES_INTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka int deserializer.
kafka: DES_STRINGdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka string deserializer.
kafka: ISOLATION_COMMITTEDdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Configures the consumer to read the committed messages only in the transactional mode when poll() is called.
kafka: ISOLATION_UNCOMMITTEDdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Configures the consumer to read all the messages including the aborted ones.
kafka: OFFSET_RESET_EARLIESTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Automatically reset the consumer offset to the earliest offset
kafka: OFFSET_RESET_LATESTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Automatically reset the consumer offset to the latest offset
kafka: OFFSET_RESET_NONEdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
If the offsetReset
is set to OFFSET_RESET_NONE
, the consumer will give an error if no previous offset is found
for the consumer group
kafka: PROTOCOL_PLAINTEXTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents Kafka un-authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_PLAINTEXTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents Kafka authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_SSLdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents Kafka SASL authenticated, SSL channel
kafka: PROTOCOL_SSLdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents Kafka SSL channel
kafka: SER_AVROdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Apache Avro serializer.
kafka: SER_BYTE_ARRAYdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka Byte Array serializer.
kafka: SER_CUSTOMdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
User-defined serializer.
kafka: SER_FLOATdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka float serializer.
kafka: SER_INTdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka int serializer.
kafka: SER_STRINGdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
In-built Kafka string serializer.
Enumsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Protocoldata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents protocol options.
Members
Listenersdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Listenerdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a Kafka consumer endpoint.
Constructordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Creates a new kafka:Listener
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
attachdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Attaches a service to the listener.
error? result = listener.attach(kafkaService);
Parameters
- 'service Service - The service to be attached
Return Type
- error? - A
kafka:Error
if an error is encountered while attaching the service or else()
'startdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function 'start() returns error?
Starts the registered services.
error? result = listener.'start();
Return Type
- error? - A
kafka:Error
if an error is encountered while starting the server or else()
gracefulStopdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function gracefulStop() returns error?
Stops the Kafka listener gracefully.
error? result = listener.gracefulStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
immediateStopdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
function immediateStop() returns error?
Stops the kafka listener immediately.
error? result = listener.immediateStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
detachdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Detaches a consumer service from the listener.
error? result = listener.detach(kafkaService);
Parameters
- 'service Service - The service to be detached
Return Type
- error? - A
kafka:Error
if an error is encountered while detaching a service or else()
Annotationsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Payloaddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
The annotation which is used to define the payload parameter in the onConsumerRecord
service method.
Recordsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: AnydataConsumerRecorddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Type related to anydata consumer record.
Fields
- key? anydata - Key that is included in the record
- value anydata - Anydata record content
- timestamp int - Timestamp of the record, in milliseconds since epoch
- offset PartitionOffset - Topic partition position in which the consumed record is stored
kafka: AnydataProducerRecorddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Details related to the anydata producer record.
Fields
- topic string - Topic to which the record will be appended
- key? anydata - Key that is included in the record
- value anydata - Anydata record content
- timestamp? int - Timestamp of the record, in milliseconds since epoch
- partition? int - Partition to which the record should be sent
kafka: AuthenticationConfigurationdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Configurations related to Kafka authentication mechanisms.
Fields
- mechanism AuthenticationMechanism(default AUTH_SASL_PLAIN) - Type of the authentication mechanism. Currently
SASL_PLAIN
,SASL_SCRAM_256
&SASL_SCRAM_512
is supported
- username string - The username to authenticate the Kafka producer/consumer
- password string - The password to authenticate the Kafka producer/consumer
kafka: BytesConsumerRecorddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Subtype related to kafka:AnydataConsumerRecord
record.
Fields
- Fields Included from *AnydataConsumerRecord
- key anydata
- value anydata
- timestamp int
- offset PartitionOffset
- headers map<byte[]|byte[][]|string|string[]>
- value byte[] - Record content in bytes
- headers map<byte[]|byte[][]> - Headers as a byte[] or byte[][]
kafka: BytesProducerRecorddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Subtype related to kafka:AnydataProducerRecord
record.
Fields
- Fields Included from *AnydataProducerRecord
- value byte[] - Record content in bytes
- headers? map<byte[]|byte[][]> - Headers as a byte[] or byte[][]
kafka: CertKeydata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a combination of certificate, private key, and private key password if encrypted.
Fields
- certFile string - A file containing the certificate
- keyFile string - A file containing the private key in PKCS8 format
- keyPassword? string - Password of the private key if it is encrypted
kafka: ConsumerConfigurationdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Configurations related to consumer endpoint.
Fields
- groupId? string - Unique string that identifies the consumer
- offsetReset? OffsetResetMethod - Offset reset strategy if no initial offset
- partitionAssignmentStrategy? string - Strategy class for handling the partition assignment among consumers
- metricsRecordingLevel? string - Metrics recording level
- metricsReporterClasses? string - Metrics reporter classes
- clientId? string - Identifier to be used for server side logging
- interceptorClasses? string - Interceptor classes to be used before sending the records
- isolationLevel? IsolationLevel - Transactional message reading method
- schemaRegistryUrl? string - Avro schema registry URL. Use this field to specify the schema registry URL, if the Avro serializer is used
- sessionTimeout? decimal - Timeout (in seconds) used to detect consumer failures when the heartbeat threshold is reached
- heartBeatInterval? decimal - Expected time (in seconds) between the heartbeats
- metadataMaxAge? decimal - Maximum time (in seconds) to force a refresh of metadata
- autoCommitInterval? decimal - Auto committing interval (in seconds) for commit offset when auto-committing is enabled
- maxPartitionFetchBytes? int - The maximum amount of data the server returns per partition
- sendBuffer? int - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer? int - Size of the TCP receive buffer (SO_RCVBUF)
- fetchMinBytes? int - Minimum amount of data the server should return for a fetch request
- fetchMaxBytes? int - Maximum amount of data the server should return for a fetch request
- fetchMaxWaitTime? decimal - Maximum amount of time (in seconds) the server will block before answering the fetch request
- reconnectBackoffTimeMax? decimal - Maximum amount of time in seconds to wait when reconnecting
- retryBackoff? decimal - Time (in seconds) to wait before attempting to retry a failed request
- metricsSampleWindow? decimal - Window of time (in seconds) a metrics sample is computed over
- metricsNumSamples? int - Number of samples maintained to compute metrics
- requestTimeout? decimal - Wait time (in seconds) for response of a request
- connectionMaxIdleTime? decimal - Close idle connections after the number of seconds
- maxPollRecords? int - Maximum number of records returned in a single call to poll
- maxPollInterval? int - Maximum delay between invocations of poll
- reconnectBackoffTime? decimal - Time (in seconds) to wait before attempting to reconnect
- pollingTimeout? decimal - Timeout interval for polling in seconds
- pollingInterval? decimal - Polling interval for the consumer in seconds
- concurrentConsumers? int - Number of concurrent consumers
- defaultApiTimeout? decimal - Default API timeout value (in seconds) for APIs with duration
- autoCommit boolean(default true) - Enables auto committing offsets
- checkCRCS boolean(default true) - Checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption occurred
to the messages. This may add some overhead and might need to be set to
false
if extreme performance is required
- excludeInternalTopics boolean(default true) - Whether records from internal topics should be exposed to the consumer
- decoupleProcessing boolean(default false) - Decouples processing
- validation boolean(default true) - Configuration related to constraint validation check
- autoSeekOnValidationFailure boolean(default true) - Automatically seeks past the errornous records in the event of an data-binding or validating constraints failure
- secureSocket? SecureSocket - Configurations related to SSL/TLS encryption
- auth? AuthenticationConfiguration - Authentication-related configurations for the
kafka:Consumer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: KafkaPayloaddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Defines the Payload remote function parameter.
kafka: PartitionOffsetdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents the topic partition position in which the consumed record is stored.
Fields
- partition TopicPartition - The
kafka:TopicPartition
to which the record is related
- offset int - Offset in which the record is stored in the partition
kafka: ProducerConfigurationdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents the kafka:Producer
configuration.
Fields
- acks ProducerAcks(default ACKS_SINGLE) - Number of acknowledgments
- compressionType CompressionType(default COMPRESSION_NONE) - Compression type to be used for messages
- clientId? string - Identifier to be used for server side logging
- metricsRecordingLevel? string - Metrics recording level
- metricReporterClasses? string - Metrics reporter classes
- partitionerClass? string - Partitioner class to be used to select the partition to which the message is sent
- interceptorClasses? string - Interceptor classes to be used before sending the records
- transactionalId? string - Transactional ID to be used in transactional delivery
- schemaRegistryUrl? string - Avro schema registry URL. Use this field to specify the schema registry URL if the Avro serializer is used
- bufferMemory? int - Total bytes of memory the producer can use to buffer records
- retryCount? int - Number of retries to resend a record
- batchSize? int - Maximum number of bytes to be batched together when sending the records. Records exceeding this limit will not be batched. Setting this to 0 will disable batching
- linger? decimal - Delay (in seconds) to allow other records to be batched before sending them to the Kafka server
- sendBuffer? int - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer? int - Size of the TCP receive buffer (SO_RCVBUF)
- maxRequestSize? int - The maximum size of a request in bytes
- reconnectBackoffTime? decimal - Time (in seconds) to wait before attempting to reconnect
- reconnectBackoffMaxTime? decimal - Maximum amount of time in seconds to wait when reconnecting
- retryBackoffTime? decimal - Time (in seconds) to wait before attempting to retry a failed request
- maxBlock? decimal - Maximum block time (in seconds) during which the sending is blocked when the buffer is full
- requestTimeout? decimal - Wait time (in seconds) for the response of a request
- metadataMaxAge? decimal - Maximum time (in seconds) to force a refresh of metadata
- metricsSampleWindow? decimal - Time (in seconds) window for a metrics sample to compute over
- metricsNumSamples? int - Number of samples maintained to compute the metrics
- maxInFlightRequestsPerConnection? int - Maximum number of unacknowledged requests on a single connection
- connectionsMaxIdleTime? decimal - Close the idle connections after this number of seconds
- transactionTimeout? decimal - Timeout (in seconds) for transaction status update from the producer
- enableIdempotence boolean(default false) - Exactly one copy of each message is written to the stream when enabled
- secureSocket? SecureSocket - Configurations related to SSL/TLS encryption
- auth? AuthenticationConfiguration - Authentication-related configurations for the
kafka:Producer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: SecureSocketdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Configurations for secure communication with the Kafka server.
Fields
- cert TrustStore|string - Configurations associated with crypto:TrustStore or single certificate file that the client trusts
- ciphers? string[] - List of ciphers to be used. By default, all the available cipher suites are supported
- provider? string - Name of the security provider used for SSL connections. The default value is the default security provider of the JVM
kafka: TopicPartitiondata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents a topic partition.
Fields
- topic string - Topic to which the partition is related
- partition int - Index of the specific partition
Errorsdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: Errordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Defines the common error type for the module.
kafka: PayloadBindingErrordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents an error, which occurred due to payload binding.
kafka: PayloadValidationErrordata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
Represents an error, which occurred due to payload constraint validation.
Union typesdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: OffsetResetMethoddata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
OffsetResetMethod
Represents the different types of offset-reset methods of the Kafka consumer.
kafka: IsolationLeveldata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
IsolationLevel
kafka:Consumer
isolation level type.
kafka: ProducerAcksdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
ProducerAcks
kafka:Producer
acknowledgement types.
kafka: CompressionTypedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
CompressionType
Kafka compression types to compress the messages.
kafka: AuthenticationMechanismdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
AuthenticationMechanism
Represents the supported Kafka SASL authentication mechanisms.
kafka: SecurityProtocoldata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
SecurityProtocol
Represents the supported security protocols for Kafka clients.
Simple name reference typesdata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
kafka: DeserializerTypedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
DeserializerType
Kafka in-built deserializer type.
kafka: SerializerTypedata:image/s3,"s3://crabby-images/64a4a/64a4a52b2f4bf67d72ad3ea7eaa96c517a9bf56c" alt=""
SerializerType
Kafka in-built serializer types.
Import
import ballerinax/kafka;
Metadata
Released date: 12 days ago
Version: 4.3.0
License: Apache-2.0
Compatibility
Platform: java21
Ballerina version: 2201.11.0
GraalVM compatible: Yes
Pull count
Total: 22971
Current verison: 223
Weekly downloads
Keywords
kafka
event streaming
network
messaging
Contributors
Dependencies