This package provides the capability to send and receive messages by connecting to the RabbitMQ server.
RabbitMQ gives your applications a common platform to send and receive messages and a safe place for your messages to live until received. RabbitMQ is one of the most popular open-source message brokers. It is lightweight and easy to deploy on-premise and in the cloud.
First, you need to set up the connection with the RabbitMQ server. The following ways can be used to connect to a RabbitMQ server.
1 rabbitmq:Client rabbitmqClient = check new(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);
1 rabbitmq:Client rabbitmqClient = check new("localhost", 5672);
1 rabbitmq:ConnectionConfiguration config = {2 username: "ballerina",3 password: "password"4 };5 rabbitmq:Client rabbitmqClient = check new("localhost", 5672, configs);
The rabbitmq:Client
can now be used to send and receive messages as described in the subsequent sections.
Client applications work with exchanges and queues, which are the high-level building blocks of the AMQP protocol. These must be declared before they can be used. The following code declares an exchange and a server-named queue and then binds them together.
1 check rabbitmqClient->exchangeDeclare("MyExchange", rabbitmq:DIRECT_EXCHANGE);2 check rabbitmqClient->queueDeclare("MyQueue");3 check rabbitmqClient->queueBind("MyQueue", "MyExchange", "routing-key");
This sample code will declare,
rabbitmq:DIRECT_EXCHANGE
Next, the queueBind
function is called to bind the queue to the exchange with the given routing key.
1 check rabbitmqClient->exchangeDeclare("MyExchange", rabbitmq:DIRECT_EXCHANGE);2 check rabbitmqClient->queueDeclare("MyQueue", { durable: true,3 exclusive: false,4 autoDelete: false });5 check rabbitmqClient->queueBind("MyQueue", "MyExchange", "routing-key");
This sample code will declare,
rabbitmq:DIRECT_EXCHANGE
1 check rabbitmqClient->queueDelete("MyQueue");
1 check rabbitmqClient->queueDelete("MyQueue", false, true);
1 check rabbitmqClient->queueDelete("MyQueue", true, false);
1 check rabbitmqClient->exchangeDelete("MyExchange");
1 check rabbitmqClient->queuePurge("MyQueue");
To publish a message to an exchange, use the publishMessage()
function as follows:
1 string message = "Hello from Ballerina";2 check rabbitmqClient->publishMessage({ content: message.toBytes(), routingKey: queueName });
Setting other properties of the message such as routing headers can be done by using the BasicProperties
record with the appropriate values.
1 rabbitmq:BasicProperties props = {2 replyTo: "reply-queue"3 };4 string message = "Hello from Ballerina";5 check rabbitmqClient->publishMessage({ content: message.toBytes(), routingKey: queueName, properties: props });
The most efficient way to receive messages is to set up a subscription using a Ballerina RabbitMQ rabbitmq:Listener
and any number of consumer services. The messages will then be delivered automatically as they arrive rather than having to be explicitly requested. Multiple consumer services can be bound to one Ballerina RabbitMQ rabbitmq:Listener
. The queue to which the service is listening is configured in the rabbitmq:ServiceConfig
annotation of the service or else as the name of the service.
onMessage
remote method:1 listener rabbitmq:Listener channelListener= new(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);23 @rabbitmq:ServiceConfig {4 queueName: "MyQueue"5 }6 service rabbitmq:Service on channelListener {7 remote function onMessage(rabbitmq:Message message) {8 }9 }
onRequest
remote method:1 listener rabbitmq:Listener channelListener= new(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);23 @rabbitmq:ServiceConfig {4 queueName: "MyQueue"5 }6 service rabbitmq:Service on channelListener {7 remote function onRequest(rabbitmq:Message message) returns string {8 return "Hello Back!";9 }10 }
The rabbitmq:Message
record received can be used to retrieve its contents.
The message consuming is supported by mainly two types of acknowledgement modes, which are auto acknowledgements and client acknowledgements. Client acknowledgements can further be divided into two different types as positive and negative acknowledgements. The default acknowledgement mode is auto-ack (messages are acknowledged immediately after consuming). The following examples show the usage of positive and negative acknowledgements.
WARNING: To ensure the reliability of receiving messages, use the client-ack mode.
1 listener rabbitmq:Listener channelListener= new(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);23 @rabbitmq:ServiceConfig {4 queueName: "MyQueue",5 autoAck: false6 }7 service rabbitmq:Service on channelListener {8 remote function onMessage(rabbitmq:Message message, rabbitmq:Caller caller) {9 rabbitmq:Error? result = caller->basicAck();10 }11 }
1 listener rabbitmq:Listener channelListener= new(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT);23 @rabbitmq:ServiceConfig {4 queueName: "MyQueue",5 autoAck: false6 }7 service rabbitmq:Service on channelListener {8 remote function onMessage(rabbitmq:Message message) {9 rabbitmq:Error? result = caller->basicNack(true, requeue = false);10 }11 }
The negatively-acknowledged (rejected) messages can be re-queued by setting the requeue
to true
.
To report bugs, request new features, start new discussions, view project boards, etc., go to the Ballerina standard library parent repository.
ballerinax/rabbitmq
sha-256:
c6d49d6aad12b1c29f44080a0e74eb3554cac75de816972937bd0deefb192a9e
License: Apache-2.0
Created date: 01 June,2023
Platform: java11
Ballerina version: 2201.5.0
service
client
messaging
network
pubsub