This package provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
This package supports Kafka 1.x.x and 2.0.0 versions.
A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer. For the producer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a producer with the basic configuration.
1import ballerinax/kafka;23kafka:ProducerConfiguration producerConfiguration = {4 clientId: "basic-producer",5 acks: "all",6 retryCount: 37};89kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);
A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer. For the consumer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a consumer with the basic configuration.
1kafka:ConsumerConfiguration consumerConfiguration = {2 groupId: "group-id", // Unique string that identifies the consumer3 offsetReset: "earliest", // Offset reset strategy if no initial offset4 topics: ["kafka-topic"]5};67kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
The Kafka consumer can be used as a listener to a set of topics without the need to manually poll
the messages.
You can use the Caller
to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.
1kafka:ConsumerConfiguration consumerConfiguration = {2 groupId: "group-id",3 topics: ["kafka-topic-1"],4 pollingInterval: 1,5 autoCommit: false6};78listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);910service on kafkaListener {11 remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) {12 // processes the records13 ...14 // commits the offsets manually15 kafka:Error? commitResult = caller->commit();1617 if commitResult is kafka:Error {18 log:printError("Error occurred while committing the offsets for the consumer ", 'error = commitResult);19 }20 }21}
Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serialization in which bytes of arrays are converted into the desired data type.
Currently, this package only supports the byte array
data type for both the keys and values. The following code snippets
show how to produce and read a message from Kafka.
1string message = "Hello World, Ballerina";2string key = "my-key";3// converts the message and key to a byte array4check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
1kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1);23foreach var kafkaRecord in records {4 byte[] messageContent = kafkaRecord.value;5 // tries to generate the string value from the byte array6 string result = check string:fromBytes(messageContent);7 io:println("The result is : ", result);8}
In Kafka, records are grouped into smaller units called partitions. These can be processed independently without compromising the correctness of the results and lays the foundation for parallel processing. This can be achieved by using multiple consumers within the same group each reading and processing data from a subset of topic partitions and running in a single thread.
Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.
The following code snippet joins a consumer to the consumer-group
and assigns it to a topic partition manually.
1kafka:ConsumerConfiguration consumerConfiguration = {2 // `groupId` determines the consumer group3 groupId: "consumer-group",4 pollingInterval: 1,5 autoCommit: false6};78kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);9// creates a topic partition10kafka:TopicPartition topicPartition = {11 topic: "kafka-topic-1",12 partition: 113};14// passes the topic partitions to the assign function as an array15check kafkaConsumer->assign([topicPartition]);
To report bugs, request new features, start new discussions, view project boards, etc., go to the Ballerina standard library parent repository.
ballerinax/kafka
sha-256:
aa9358eb3fbaf002caaceaa2ed171c34991acac4df814220633496ba0d39c2e5
License: Apache-2.0
Created date: 18 September,2023
Platform: java17
Ballerina version: 2201.8.0
Verified with GraalVM: Yes
kafka
event streaming
network
messaging