kafka
Module kafka
Declarations
Definitions
ballerinax/kafka Ballerina library
Overview
This module provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
This module supports Kafka 1.x.x and 2.0.0 versions.
Consumer and producer
Kafka producer
A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer. For the producer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a producer with the basic configuration.
import ballerinax/kafka; kafka:ProducerConfiguration producerConfiguration = { clientId: "basic-producer", acks: "all", retryCount: 3 }; kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfiguration);
Kafka consumer
A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer. For the consumer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a consumer with the basic configuration.
kafka:ConsumerConfiguration consumerConfiguration = { groupId: "group-id", // Unique string that identifies the consumer offsetReset: "earliest", // Offset reset strategy if no initial offset topics: ["kafka-topic"] }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
Listener
The Kafka consumer can be used as a listener to a set of topics without the need to manually poll
the messages.
You can use the Caller
to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.
kafka:ConsumerConfiguration consumerConfigs = { groupId: "group-id", topics: ["kafka-topic-1"], pollingInterval: 1, autoCommit: false }; listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration); service kafka:Service on kafkaListener { remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) { // processes the records ... // commits the offsets manually kafka:Error? commitResult = caller->commit(); if commitResult is kafka:Error { log:printError("Error occurred while committing the offsets for the consumer ", 'error = commitResult); } } }
Data serialization
Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serialization in which bytes of arrays are converted into the desired data type.
Currently, this module only supports the byte array
data type for both the keys and values. The following code snippets
show how to produce and read a message from Kafka.
string message = "Hello World, Ballerina"; string key = "my-key"; // converts the message and key to a byte array check kafkaProducer->send({ topic: "test-kafka-topic", key: key.toBytes(), value: message.toBytes() });
kafka:ConsumerRecord[] records = check kafkaConsumer->poll(1); foreach var kafkaRecord in records { byte[] messageContent = kafkaRecord.value; // tries to generate the string value from the byte array string result = check string:fromBytes(messageContent); io:println("The result is : ", result); }
Concurrency
In Kafka, records are grouped into smaller units called partitions. These can be processed independently without compromising the correctness of the results and lays the foundation for parallel processing. This can be achieved by using multiple consumers within the same group each reading and processing data from a subset of topic partitions and running in a single thread.
Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.
The following code snippet joins a consumer to the consumer-group
and assigns it to a topic partition manually.
kafka:ConsumerConfiguration consumerConfigs = { // `groupId` determines the consumer group groupId: "consumer-group", pollingInterval: 1, autoCommit: false }; kafka:Consumer kafkaConsumer = check new (kafka:DEFAULT_URL, consumerConfiguration); // creates a topic partition kafka:TopicPartition topicPartition = { topic: "kafka-topic-1", partition: 1 }; // passes the topic partitions to the assign function as an array check kafkaConsumer->assign([topicPartition]);
Clients
kafka: Caller
Represents a Kafka caller, which can be used to commit the offsets consumed by the service.
'commit
function 'commit() returns Error?
Commits the currently consumed offsets of the service.
kafka:Error? result = caller->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets and partitions for the given topics of the service.
kafka:Error? result = caller->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seek
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
kafka: Consumer
Represents a Kafka consumer endpoint.
Constructor
Creates a new kafka:Consumer
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
assign
function assign(TopicPartition[] partitions) returns Error?
Assigns consumer to a set of topic partitions.
kafka:Error? result = consumer->assign([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to be assigned
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
close
Closes the consumer connection with the external Kafka broker.
kafka:Error? result = consumer->close();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the close operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
'commit
function 'commit() returns Error?
Commits the currently consumed offsets of the consumer.
kafka:Error? result = consumer->commit();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
commitOffset
function commitOffset(PartitionOffset[] offsets, decimal duration) returns Error?
Commits the given offsets of the specific topic partitions for the consumer.
kafka:Error? result = consumer->commitOffset([partitionOffset1, partitionOffset2]);
Parameters
- offsets PartitionOffset[] - Offsets to be commited
- duration decimal (default -1) - Timeout duration (in seconds) for the commit operation execution
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
getAssignment
function getAssignment() returns TopicPartition[]|Error
Retrieves the currently-assigned partitions of the consumer.
kafka:TopicPartition[] result = check consumer->getAssignment();
Return Type
- TopicPartition[]|Error - Array of assigned partitions for the consumer if executes successfully or else a
kafka:Error
getAvailableTopics
Retrieves the available list of topics for a particular consumer.
string[] result = check consumer->getAvailableTopics();
Parameters
- duration decimal (default -1) - Timeout duration (in seconds) for the execution of the
getAvailableTopics
operation
Return Type
getBeginningOffsets
function getBeginningOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the start offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getBeginningOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Array of topic partitions to get the starting offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getBeginningOffsets
execution
Return Type
- PartitionOffset[]|Error - Starting offsets for the given partitions if executes successfully or else a
kafka:Error
getCommittedOffset
function getCommittedOffset(TopicPartition partition, decimal duration) returns PartitionOffset|Error?
Retrieves the lastly committed offset for the given topic partition.
kafka:PartitionOffset? result = check consumer->getCommittedOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the committed offset is returned to the consumer
- duration decimal (default -1) - Timeout duration (in seconds) for the
getCommittedOffset
operation to execute
Return Type
- PartitionOffset|Error? - The last committed offset for a given partition for the consumer if there is a committed offset
present,
()
if there are no committed offsets, or else akafka:Error
getEndOffsets
function getEndOffsets(TopicPartition[] partitions, decimal duration) returns PartitionOffset[]|Error
Retrieves the last offsets for a given set of partitions.
kafka:PartitionOffset[] result = check consumer->getEndOffsets([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of partitions to get the last offsets
- duration decimal (default -1) - Timeout duration (in seconds) for the
getEndOffsets
operation to execute
Return Type
- PartitionOffset[]|Error - End offsets for the given partitions if executes successfully or else a
kafka:Error
getPausedPartitions
function getPausedPartitions() returns TopicPartition[]|Error
Retrieves the partitions, which are currently paused.
kafka:TopicPartition[] result = check consumer->getPausedPartitions();
Return Type
- TopicPartition[]|Error - The set of partitions paused from message retrieval if executes successfully or else a
kafka:Error
getPositionOffset
function getPositionOffset(TopicPartition partition, decimal duration) returns int|Error
Retrieves the offset of the next record that will be fetched if a record exists in that position.
int result = check consumer->getPositionOffset(topicPartition);
Parameters
- partition TopicPartition - The
TopicPartition
in which the position is required
- duration decimal (default -1) - Timeout duration (in seconds) for the get position offset operation to execute
Return Type
getSubscription
Retrieves the set of topics, which are currently subscribed by the consumer.
string[] result = check consumer->getSubscription();
Return Type
getTopicPartitions
function getTopicPartitions(string topic, decimal duration) returns TopicPartition[]|Error
Retrieves the set of partitions to which the topic belongs.
kafka:TopicPartition[] result = check consumer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The topic for which the partition information is needed
- duration decimal (default -1) - Timeout duration (in seconds) for the
getTopicPartitions
operation to execute
Return Type
- TopicPartition[]|Error - Array of partitions for the given topic if executes successfully or else a
kafka:Error
pause
function pause(TopicPartition[] partitions) returns Error?
Pauses retrieving messages from a set of partitions.
kafka:Error? result = consumer->pause([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Set of topic partitions to pause the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
poll
function poll(decimal timeout, typedesc<AnydataConsumerRecord[]> T) returns T|Error
Polls the external broker to retrieve messages.
kafka:AnydataConsumerRecord[] result = check consumer->poll(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<AnydataConsumerRecord[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of consumer records if executed successfully or else a
kafka:Error
pollPayload
Polls the external broker to retrieve messages in the required data type without the kafka:ConsumerRecord
information.
Person[] persons = check consumer->pollPayload(10);
Parameters
- timeout decimal - Polling time in seconds
- T typedesc<anydata[]> (default <>) - Optional type description of the required data type
Return Type
- T|Error - Array of data in the required format if executed successfully or else a
kafka:Error
resume
function resume(TopicPartition[] partitions) returns Error?
Resumes retrieving messages from a set of partitions, which were paused earlier.
kafka:Error? result = consumer->resume([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - Topic partitions to resume the retrieval of messages
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seek
function seek(PartitionOffset offset) returns Error?
Seeks for a given offset in a topic partition.
kafka:Error? result = consumer->seek(partitionOffset);
Parameters
- offset PartitionOffset - The
PartitionOffset
to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToBeginning
function seekToBeginning(TopicPartition[] partitions) returns Error?
Seeks to the beginning of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToBeginning([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
seekToEnd
function seekToEnd(TopicPartition[] partitions) returns Error?
Seeks to the end of the offsets for a given set of topic partitions.
kafka:Error? result = consumer->seekToEnd([topicPartition1, topicPartition2]);
Parameters
- partitions TopicPartition[] - The set of topic partitions to seek
Return Type
- Error? - A
kafka:Error
if an error is encountered or else()
subscribe
Subscribes the consumer to the provided set of topics.
kafka:Error? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
subscribeWithPattern
Subscribes the consumer to the topics, which match the provided pattern.
kafka:Error? result = consumer->subscribeWithPattern("kafka.*");
Parameters
- regex string - The pattern, which should be matched with the topics to be subscribed
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
unsubscribe
function unsubscribe() returns Error?
Unsubscribes from all the topics that the consumer is subscribed to.
kafka:Error? result = consumer->unsubscribe();
Return Type
- Error? - A
kafka:Error
if an error is encountered or else '()'
kafka: Listener
Represents a Kafka consumer endpoint.
Constructor
Creates a new kafka:Listener
.
init (string|string[] bootstrapServers, *ConsumerConfiguration config)
- config *ConsumerConfiguration - Configurations related to the consumer endpoint
'start
function 'start() returns error?
Starts the registered services.
error? result = listener.'start();
Return Type
- error? - A
kafka:Error
if an error is encountered while starting the server or else()
gracefulStop
function gracefulStop() returns error?
Stops the Kafka listener gracefully.
error? result = listener.gracefulStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
immediateStop
function immediateStop() returns error?
Stops the kafka listener immediately.
error? result = listener.immediateStop();
Return Type
- error? - A
kafka:Error
if an error is encountered during the listener-stopping process or else()
attach
Attaches a service to the listener.
error? result = listener.attach(kafkaService);
Parameters
- 'service Service - The service to be attached
Return Type
- error? - A
kafka:Error
if an error is encountered while attaching the service or else()
detach
Detaches a consumer service from the listener.
error? result = listener.detach(kafkaService);
Parameters
- 'service Service - The service to be detached
Return Type
- error? - A
kafka:Error
if an error is encountered while detaching a service or else()
kafka: Producer
Represents a Kafka producer endpoint.
Constructor
Creates a new kafka:Producer
.
init (string|string[] bootstrapServers, *ProducerConfiguration config)
- config *ProducerConfiguration - Configurations related to initializing a
kafka:Producer
close
function close() returns Error?
Closes the producer connection to the external Kafka broker.
kafka:Error? result = producer->close();
Return Type
- Error? - A
kafka:Error
if closing the producer failed or else '()'
'flush
function 'flush() returns Error?
Flushes the batch of records already sent to the broker by the producer.
kafka:Error? result = producer->'flush();
Return Type
- Error? - A
kafka:Error
if records couldn't be flushed or else '()'
getTopicPartitions
function getTopicPartitions(string topic) returns TopicPartition[]|Error
Retrieves the topic partition information for the provided topic.
kafka:TopicPartition[] result = check producer->getTopicPartitions("kafka-topic");
Parameters
- topic string - The specific topic, of which the topic partition information is required
Return Type
- TopicPartition[]|Error - A
kafka:TopicPartition
array for the given topic or else akafka:Error
if the operation fails
send
function send(AnydataProducerRecord producerRecord) returns Error?
Produces records to the Kafka server.
kafka:Error? result = producer->send({value: "Hello World".toBytes(), topic: "kafka-topic"});
Parameters
- producerRecord AnydataProducerRecord - Record to be produced
Return Type
- Error? - A
kafka:Error
if send action fails to send data or else '()'
Constants
kafka: ACKS_ALL
Producer acknowledgement type is 'all'. This will guarantee that the record will not be lost as long as at least one in-sync replica is alive.
kafka: ACKS_NONE
Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.
kafka: ACKS_SINGLE
Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its A local log will respond without waiting FOR full acknowledgement from all the followers.
kafka: AUTH_SASL_PLAIN
Kafka SASL_PLAIN authentication mechanism
kafka: COMPRESSION_GZIP
Kafka GZIP compression type.
kafka: COMPRESSION_LZ4
Kafka LZ4 compression type.
kafka: COMPRESSION_NONE
No compression.
kafka: COMPRESSION_SNAPPY
Kafka Snappy compression type.
kafka: COMPRESSION_ZSTD
Kafka ZSTD compression type.
kafka: DEFAULT_URL
The default server URL.
kafka: DES_AVRO
Apache Avro deserializer.
kafka: DES_BYTE_ARRAY
In-built Kafka byte array deserializer.
kafka: DES_CUSTOM
User-defined deserializer.
kafka: DES_FLOAT
In-built Kafka float deserializer.
kafka: DES_INT
In-built Kafka int deserializer.
kafka: DES_STRING
In-built Kafka string deserializer.
kafka: ISOLATION_COMMITTED
Configures the consumer to read the committed messages only in the transactional mode when poll() is called.
kafka: ISOLATION_UNCOMMITTED
Configures the consumer to read all the messages including the aborted ones.
kafka: OFFSET_RESET_EARLIEST
Automatically reset the consumer offset to the earliest offset
kafka: OFFSET_RESET_LATEST
Automatically reset the consumer offset to the latest offset
kafka: OFFSET_RESET_NONE
If the offsetReset
is set to OFFSET_RESET_NONE
, the consumer will give an error if no previous offset is found
for the consumer group
kafka: PROTOCOL_PLAINTEXT
Represents Kafka un-authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_PLAINTEXT
Represents Kafka authenticated, non-encrypted channel
kafka: PROTOCOL_SASL_SSL
Represents Kafka SASL authenticated, SSL channel
kafka: PROTOCOL_SSL
Represents Kafka SSL channel
kafka: SER_AVRO
Apache Avro serializer.
kafka: SER_BYTE_ARRAY
In-built Kafka Byte Array serializer.
kafka: SER_CUSTOM
User-defined serializer.
kafka: SER_FLOAT
In-built Kafka float serializer.
kafka: SER_INT
In-built Kafka int serializer.
kafka: SER_STRING
In-built Kafka string serializer.
Enums
kafka: Protocol
Represents protocol options.
Members
Annotations
kafka: Payload
The annotation which is used to define the payload parameter in the onConsumerRecord
service method.
Records
kafka: AnydataConsumerRecord
Type related to anydata consumer record.
Fields
- key anydata? - Key that is included in the record
- value anydata - Anydata record content
- timestamp int - Timestamp of the record, in milliseconds since epoch
- offset PartitionOffset - Topic partition position in which the consumed record is stored
kafka: AnydataProducerRecord
Details related to the anydata producer record.
Fields
- topic string - Topic to which the record will be appended
- key anydata? - Key that is included in the record
- value anydata - Anydata record content
- timestamp int? - Timestamp of the record, in milliseconds since epoch
- partition int? - Partition to which the record should be sent
kafka: AuthenticationConfiguration
Configurations related to Kafka authentication mechanisms.
Fields
- mechanism AuthenticationMechanism(default AUTH_SASL_PLAIN) - Type of the authentication mechanism. Currently only
SASL_PLAIN
is supported
- username string - The username to authenticate the Kafka producer/consumer
- password string - The password to authenticate the Kafka producer/consumer
kafka: BytesConsumerRecord
Subtype related to kafka:AnydataConsumerRecord
record.
Fields
- Fields Included from *AnydataConsumerRecord
- key anydata
- value anydata
- timestamp int
- offset PartitionOffset
- value byte[] - Record content in bytes
kafka: BytesProducerRecord
Subtype related to kafka:AnydataProducerRecord
record.
Fields
- Fields Included from *AnydataProducerRecord
- value byte[] - Record content in bytes
kafka: CertKey
Represents a combination of certificate, private key, and private key password if encrypted.
Fields
- certFile string - A file containing the certificate
- keyFile string - A file containing the private key in PKCS8 format
- keyPassword string? - Password of the private key if it is encrypted
kafka: ConsumerConfiguration
Configurations related to consumer endpoint.
Fields
- groupId string? - Unique string that identifies the consumer
- offsetReset OffsetResetMethod? - Offset reset strategy if no initial offset
- partitionAssignmentStrategy string? - Strategy class for handling the partition assignment among consumers
- metricsRecordingLevel string? - Metrics recording level
- metricsReporterClasses string? - Metrics reporter classes
- clientId string? - Identifier to be used for server side logging
- interceptorClasses string? - Interceptor classes to be used before sending the records
- isolationLevel IsolationLevel? - Transactional message reading method
- schemaRegistryUrl string? - Avro schema registry URL. Use this field to specify the schema registry URL, if the Avro serializer is used
- sessionTimeout decimal? - Timeout (in seconds) used to detect consumer failures when the heartbeat threshold is reached
- heartBeatInterval decimal? - Expected time (in seconds) between the heartbeats
- metadataMaxAge decimal? - Maximum time (in seconds) to force a refresh of metadata
- autoCommitInterval decimal? - Auto committing interval (in seconds) for commit offset when auto-committing is enabled
- maxPartitionFetchBytes int? - The maximum amount of data the server returns per partition
- sendBuffer int? - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer int? - Size of the TCP receive buffer (SO_RCVBUF)
- fetchMinBytes int? - Minimum amount of data the server should return for a fetch request
- fetchMaxBytes int? - Maximum amount of data the server should return for a fetch request
- fetchMaxWaitTime decimal? - Maximum amount of time (in seconds) the server will block before answering the fetch request
- reconnectBackoffTimeMax decimal? - Maximum amount of time in seconds to wait when reconnecting
- retryBackoff decimal? - Time (in seconds) to wait before attempting to retry a failed request
- metricsSampleWindow decimal? - Window of time (in seconds) a metrics sample is computed over
- metricsNumSamples int? - Number of samples maintained to compute metrics
- requestTimeout decimal? - Wait time (in seconds) for response of a request
- connectionMaxIdleTime decimal? - Close idle connections after the number of seconds
- maxPollRecords int? - Maximum number of records returned in a single call to poll
- maxPollInterval int? - Maximum delay between invocations of poll
- reconnectBackoffTime decimal? - Time (in seconds) to wait before attempting to reconnect
- pollingTimeout decimal? - Timeout interval for polling in seconds
- pollingInterval decimal? - Polling interval for the consumer in seconds
- concurrentConsumers int? - Number of concurrent consumers
- defaultApiTimeout decimal? - Default API timeout value (in seconds) for APIs with duration
- autoCommit boolean(default true) - Enables auto committing offsets
- checkCRCS boolean(default true) - Checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption occurred
to the messages. This may add some overhead and might need to be set to
false
if extreme performance is required
- excludeInternalTopics boolean(default true) - Whether records from internal topics should be exposed to the consumer
- decoupleProcessing boolean(default false) - Decouples processing
- validation boolean(default true) - Configuration related to constraint validation check
- autoSeekOnValidationFailure boolean(default true) - Automatically seeks past the errornous records in the event of an data-binding or validating constraints failure
- secureSocket SecureSocket? - Configurations related to SSL/TLS encryption
- auth AuthenticationConfiguration? - Authentication-related configurations for the
kafka:Consumer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: ConsumerRecord
Type related to consumer record.
Fields
- key byte[]? - Key that is included in the record
- value byte[] - Record content
- timestamp int - Timestamp of the record, in milliseconds since epoch
- offset PartitionOffset - Topic partition position in which the consumed record is stored
Deprecated
Usage of this record is deprecated. Use subtypes of AnydataConsumerRecord instead to support data-binding
kafka: KafkaPayload
Defines the Payload remote function parameter.
kafka: PartitionOffset
Represents the topic partition position in which the consumed record is stored.
Fields
- partition TopicPartition - The
kafka:TopicPartition
to which the record is related
- offset int - Offset in which the record is stored in the partition
kafka: ProducerConfiguration
Represents the kafka:Producer
configuration.
Fields
- acks ProducerAcks(default ACKS_SINGLE) - Number of acknowledgments
- compressionType CompressionType(default COMPRESSION_NONE) - Compression type to be used for messages
- clientId string? - Identifier to be used for server side logging
- metricsRecordingLevel string? - Metrics recording level
- metricReporterClasses string? - Metrics reporter classes
- partitionerClass string? - Partitioner class to be used to select the partition to which the message is sent
- interceptorClasses string? - Interceptor classes to be used before sending the records
- transactionalId string? - Transactional ID to be used in transactional delivery
- schemaRegistryUrl string? - Avro schema registry URL. Use this field to specify the schema registry URL if the Avro serializer is used
- bufferMemory int? - Total bytes of memory the producer can use to buffer records
- retryCount int? - Number of retries to resend a record
- batchSize int? - Maximum number of bytes to be batched together when sending the records. Records exceeding this limit will not be batched. Setting this to 0 will disable batching
- linger decimal? - Delay (in seconds) to allow other records to be batched before sending them to the Kafka server
- sendBuffer int? - Size of the TCP send buffer (SO_SNDBUF)
- receiveBuffer int? - Size of the TCP receive buffer (SO_RCVBUF)
- maxRequestSize int? - The maximum size of a request in bytes
- reconnectBackoffTime decimal? - Time (in seconds) to wait before attempting to reconnect
- reconnectBackoffMaxTime decimal? - Maximum amount of time in seconds to wait when reconnecting
- retryBackoffTime decimal? - Time (in seconds) to wait before attempting to retry a failed request
- maxBlock decimal? - Maximum block time (in seconds) during which the sending is blocked when the buffer is full
- requestTimeout decimal? - Wait time (in seconds) for the response of a request
- metadataMaxAge decimal? - Maximum time (in seconds) to force a refresh of metadata
- metricsSampleWindow decimal? - Time (in seconds) window for a metrics sample to compute over
- metricsNumSamples int? - Number of samples maintained to compute the metrics
- maxInFlightRequestsPerConnection int? - Maximum number of unacknowledged requests on a single connection
- connectionsMaxIdleTime decimal? - Close the idle connections after this number of seconds
- transactionTimeout decimal? - Timeout (in seconds) for transaction status update from the producer
- enableIdempotence boolean(default false) - Exactly one copy of each message is written to the stream when enabled
- secureSocket SecureSocket? - Configurations related to SSL/TLS encryption
- auth AuthenticationConfiguration? - Authentication-related configurations for the
kafka:Producer
- securityProtocol SecurityProtocol(default PROTOCOL_PLAINTEXT) - Type of the security protocol to use in the broker connection
kafka: ProducerRecord
Details related to the producer record.
Fields
- topic string - Topic to which the record will be appended
- key byte[]? - Key that is included in the record
- value byte[] - Record content
- timestamp int? - Timestamp of the record, in milliseconds since epoch
- partition int? - Partition to which the record should be sent
Deprecated
Usage of this record is deprecated. Use subtypes of AnydataProducerRecord instead to support data-binding
kafka: SecureSocket
Configurations for secure communication with the Kafka server.
Fields
- cert TrustStore|string - Configurations associated with crypto:TrustStore or single certificate file that the client trusts
- ciphers string[]? - List of ciphers to be used. By default, all the available cipher suites are supported
- provider string? - Name of the security provider used for SSL connections. The default value is the default security provider of the JVM
kafka: TopicPartition
Represents a topic partition.
Fields
- topic string - Topic to which the partition is related
- partition int - Index of the specific partition
Errors
kafka: Error
Defines the common error type for the module.
kafka: PayloadBindingError
Represents an error, which occurred due to payload binding.
kafka: PayloadValidationError
Represents an error, which occurred due to payload constraint validation.
Object types
kafka: Service
The Kafka service type.
Union types
kafka: CompressionType
CompressionType
Kafka compression types to compress the messages.
kafka: IsolationLevel
IsolationLevel
kafka:Consumer
isolation level type.
kafka: OffsetResetMethod
OffsetResetMethod
Represents the different types of offset-reset methods of the Kafka consumer.
kafka: ProducerAcks
ProducerAcks
kafka:Producer
acknowledgement types.
kafka: SecurityProtocol
SecurityProtocol
Represents the supported security protocols for Kafka clients.
Import
import ballerinax/kafka;
Metadata
Released date: over 1 year ago
Version: 3.8.0
License: Apache-2.0
Compatibility
Platform: java11
Ballerina version: 2201.5.0
Pull count
Total: 20797
Current verison: 1624
Weekly downloads
Keywords
kafka
event streaming
network
messaging
Contributors
Dependencies