Module trigger.asb
ballerinax/trigger.asb Ballerina library
Overview
The ballerinax/trigger.asb
module supports asynchronous message listening capabilities from the Azure Service Bus via the Ballerina language.
This module supports Service Bus SDK 7.13.1 version.
The source code on GitHub is located here.
The primary wire protocol for Service Bus is Advanced Messaging Queueing Protocol (AMQP) 1.0, an open ISO/IEC standard.
Prerequisites
Before using this connector in your Ballerina application, complete the following:
-
Create an Azure account and a subscription. If you don't have an Azure subscription, sign up for a free Azure account.
-
Create a Service Bus namespace. If you don't have a service bus namespace , learn how to create your Service Bus namespace.
-
Create a messaging entity, such as a queue, topic or subscription. If you don't have these items, learn how to
-
Obtain tokens
Shared Access Signature (SAS) Authentication Credentials are required to communicate with the Azure Service Bus.
- Connection String
Obtain the authorization credentials:
-
For Service Bus Queues
-
For Service Bus Topics and Subscriptions
Quickstart
To use the Azure Service Bus listener in your Ballerina application, update the .bal file as follows:
Enabling Azure SDK Logs
To enable Azure logs in a Ballerina module, you need to set the environment variable ASB_CLOUD_LOGS to ACTIVE. You can do this by adding the following line to your shell script or using the export command in your terminal(to deactivate,remove the variable value):
export ASB_CLOUD_LOGS=ACTIVE
Enabling Internal Connector Logs
To enable internal connector logs in a Ballerina module, you need to set the log level in the Config.toml file using the custom configuration record Where <log_level> is the desired log level (e.g. DEBUG, INFO, WARN, (Default)ERROR, FATAL, OFF)
[ballerinax.trigger.asb.options] logLevel="OFF"
Step 1: Import listener
Import the ballerinax/trigger.asb
module as shown below.
import ballerinax/trigger.asb;
Step 2: Create a new listener endpoint
asb:ListenerConfig configuration = { connectionString: "CONNECTION_STRING" }; listener asb:Listener asbListener = new (configuration);
Step 3: Define a consumer service
-
Now you can define one or more consumer services and attach it to the defined listener endpoint. The messages will then be delivered automatically as they arrive rather than having to be explicitly requested. Multiple consumer services can be bound to one Ballerina Azure Service Bus
asb:Listener
. -
Listener configurations such as queue or topic name to listen to, message receive mode etc. are configured in the
asb:ServiceConfig
annotation of the service.@asb:ServiceConfig { queueName: "MyQueue", peekLockModeEnabled: true, topicName: "myTopic", subscriptionName: "abc", maxConcurrency: 1, prefetchCount: 10, maxAutoLockRenewDuration: 300, logLevel: ERROR } service asb:Service on myTestListener { remote function onMessage(asb:Message message) { //process message } }
- queueName : Name of the queue service should listen to. Either queueName or topicName should be configured.
- topicName : Name of the topic service should listen to. Either queueName or topicName should be configured.
- peekLockModeEnabled : ASB has two modes of message receive. Setting this to
true
makes the mode toPEEK_LOCK
. Making it false or not setting it makes the mode toRECEIVE_AND_DELETE
. You can find more information about receive modes here - subscriptionName : Name of the subscription if a topicName is specified.
- maxConcurrency : How many messages are parallelly dispatched to the service (default =1, means sequential).
- prefetchCount: Number of messages pre-fetched by underlying subscriber for efficiency (default - 0, means prefetch off)
- maxAutoLockRenewDuration: Sets the amount of time to continue auto-renewing the lock in
seconds
. Setting Duration to #ZERO disables auto-renewal. This is not considered whenpeekLockModeEnabled
is false. (default 300 seconds) - logLevel: desired log level (e.g. DEBUG, INFO, WARN, (Default)ERROR, FATAL, OFF) for underlying SDK
-
Implement logic on how to process the message under
onMessage
resource. -
Implement logic on how to process error when receiving a message under
onError
resource. -
Following is a complete example on how to listen to messages from an Azure Service Bus queue called
MyQueue
sequentially using Ballerina ASB listener.Listen to Messages from the Azure Service Bus
import ballerina/lang.value; import ballerina/log; import ballerinax/trigger.asb; asb:ListenerConfig configuration = { connectionString: "CONNECTION_STRING" }; listener asb:Listener asbListener = new (configuration); @asb:ServiceConfig { queueName: "MyQueue", peekLockModeEnabled: true, maxConcurrency: 1, prefetchCount: 10, maxAutoLockRenewDuration: 300 } service asb:MessageService on asbListener { isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error? { // Write your message processing logic here log:printInfo("Message received from queue: " + message.toBalString()); _ = check caller.complete(message); } isolated remote function onError(asb:ErrorContext context, error 'error) returns error? { // Write your error handling logic here } };
!!! NOTE: You can complete, abandon, deadLetter, defer, renewLock using the
asb:Caller
instance. If you want to handle to errors that come when processing messages, useMessageServiceErrorHandling
service type.Use
bal run
command to compile and run the Ballerina program.