ballerina-platform/ballerina-library

[Proposal] Introduce async message consumption functionality to Ballerina Azure service bus connector

Closed this issue ยท 8 comments

Summary

Asynchronous message consumption is a standard practice used in message broker based systems. Hence, the
Ballerina Azure service bus connector should support the async message consumption functionality.

Goals

  • Introduce async message consumption functionality to the Ballerina Azure service bus connector

Motivation

In applications that rely on message brokers, the ability to process messages asynchronously is crucial for
ensuring scalability, reliability, and responsiveness. While the Azure Service Bus Java SDK supports asynchronous
message consumption, this capability is currently absent in the Ballerina connector. Therefore, it is essential to
integrate this functionality into the Ballerina Azure Service Bus connector.

In Ballerina, asynchronous message consumption corresponds to a listener-based programming model. Consequently,
the Ballerina Azure Service Bus connector must support listener-based message consumption capability.

Description

As mentioned in the Goals section the purpose of this proposal is to introduce async message consumption
functionality to the Ballerina Azure service bus connector.

The key functionalities expected from this change are as follows,

  • Listener implementation which supports asynchronous message consumption from Azure service bus entity (topic or queue)
  • Service type which supports message processing and error handling
  • Caller implementation which supports ack/nack functionalities of the underlying client

Following is an example code segment with the proposed solution:

import ballerinax/asb;

listener asb:Listener asbListener = check new (
    connectionString = "<connection-string>",
    entityConfig = {
        queueName: "<queue-name>"
    },
    autoComplete = false
);

service asb:Service on asbListener {

    isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error? {
        // implement the message processing logic here
    }

    isolated remote function onError(asb:MessageRetrievalError 'error) returns error? {
        // implement error handling logic here
    }
}

API additions

The following configuration records will be introduced to the package:

# Represents Azure service bus listener configuration.
#
# + autoComplete - Enables auto-complete and auto-abandon of received messages 
# + prefetchCount - The number of messages to prefetch  
# + maxConcurrency - Max concurrent messages that this listener should process
public type ListenerConfiguration record {|
    *asb:ASBServiceReceiverConfig;
    boolean autoComplete = true;
    int prefetchCount = 0;
    int maxConcurrency = 1;
|};

# Options to specify when deferring an `asb:Message` received via `asb:ReceiveMode#PEEK_LOCK`.
#
# + propertiesToModify - Message properties to modify
public type AbandonOptions record {|
    map<anydata> propertiesToModify?;
|};

# Options to specify when sending an `asb:Message` received via `asb:ReceiveMode#PEEK_LOCK` to the dead-letter queue.
#
# + deadLetterReason - The deadletter reason
# + deadLetterErrorDescription - The deadletter error description
# + propertiesToModify - Message properties to modify
public type DeadLetterOptions record {|
    string deadLetterReason?;
    string deadLetterErrorDescription?;
    map<anydata> propertiesToModify?;
|};

# Options to specify when deferring an `asb:Message` received via `asb:ReceiveMode#PEEK_LOCK`.
#
# + propertiesToModify - Message properties to modify
public type DeferOptions record {|
    map<anydata> propertiesToModify?;
|};

The following error types and error details records will be introduced to the package:

# Error type to capture the errors occurred while retrieving messages in Azure service bus listener.
public type MessageRetrievalError distinct (asb:Error & error<ErrorContext>);

# Represents message retrieval error context.
#
# + entityPath - The entity path of the error source  
# + className - The name of the originating class    
# + namespace -  The namespace of the error source  
# + errorSource - The error source, such as a function or action name   
# + reason - The reason for the error
public type ErrorContext record {
    string entityPath;
    string className;
    string namespace;
    string errorSource;
    string reason;
};

The following service object and caller type will be introduced to the package:

# The ASB service type.
public type Service distinct service object {
    // isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error?;

    // isolated remote function onError(asb:MessageRetrievalError 'error) returns error?;
};

# Represents a ASB caller, which can be used to mark messages as complete, abandon, deadLetter, or defer.
public type Caller distinct client object {

    # Complete message from queue or subscription based on messageLockToken. Declares the message processing to be 
    # successfully completed, removing the message from the queue.
    isolated remote function complete(asb:Message message) returns asb:Error?;

    # Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for 
    # the time being, returning the message immediately back to the queue to be picked up by another (or the same) 
    # receiver.
    isolated remote function abandon(asb:Message message, *AbandonOptions options) returns asb:Error?;

    # Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer 
    # the message from the primary queue into a special "dead-letter sub-queue".
    isolated remote function deadLetter(asb:Message message, *DeadLetterOptions options) returns asb:Error?;

    # Defer the message in a Queue or Subscription based on messageLockToken.  It prevents the message from being 
    # directly received from the queue by setting it aside such that it must be received by sequence number.
    isolated remote function defer(asb:Message message, *DeferOptions options) returns int|asb:Error;
};

Runtime errors and validations

Attaching multiple services to a listener

A listener is directly associated with a single native client, which in turn is tied to a specific Azure Service Bus (ASB) entity, either a queue or a topic. Consequently, linking multiple services to the same listener does not make sense. Therefore, if multiple services are attached to the same listener, a runtime error will be thrown. In the future, this should be validated during compilation by a compiler plugin.

Using autoComplete mode

When autoComplete mode is enabled, explicit acknowledgment or rejection of messages (ack/nack) by the caller is unnecessary. Therefore, if autoComplete is active and the developer defines the onMessage method with an asb:Caller parameter, a runtime error will be thrown. In the future, this should be validated during compilation by a compiler plugin.

Further improvements

  • Introduce payload data-binding support for the onMessage method of the asb:Service
  • Introduce compiler plugin validation for asb:Serivce type

@ayeshLK The overall proposal looks good to me. As the further improvement, probably we can check whether its possible to support payload binding directly in the listener's onMessage() method. With that, users will be able to define their own Message records (maybe as an inclusion of asb:Message) with the expected body type and, the listener will automatically databind into that type upon retrieving the message.

  • Do we need to pass asb:Message for the caller functions? Can't we omit it and retrieve the context from the caller? So user can just call complete once it is successfully processed.

    service asb:Service on asbListener {
    
        isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error? {
            // implement the message processing logic here
            caller->complete();
        }
    
        ......
    }
  • Shall we have the caller as the first parameter like below? we normally follow it in other listeners

          isolated remote function onMessage(asb:Caller caller, asb:Message message) returns error? {
              // implement the message processing logic here
          }
  • Since AbandonOptions and DeferOptions, allow anydata map, we can use an open record and for the DeadLetterOptions, we can define like below,

    # Options to specify when sending an `asb:Message` received via `asb:ReceiveMode#PEEK_LOCK` to the dead-letter queue.
    #
    # + deadLetterReason - The deadletter reason
    # + deadLetterErrorDescription - The deadletter error description
    public type DeadLetterOptions record {|
        string deadLetterReason?;
        string deadLetterErrorDescription?;
        anydata...
    |};
  • Do we need to pass asb:Message for the caller functions? Can't we omit it and retrieve the context from the caller? So user can just call complete once it is successfully processed.
    ```ballerina
    service asb:Service on asbListener {

        isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error? {
            // implement the message processing logic here
            caller->complete();
        }
    
        ......
    }
    ```
    

+1

* Shall we have the `caller` as the first parameter like below? we normally follow it in other listeners
  ```ballerina
        isolated remote function onMessage(asb:Caller caller, asb:Message message) returns error? {
            // implement the message processing logic here
        }
  ```

Do we need to preserve the parameter order here ? I think in the runtime APIs we have support to dynamically change the parameter order. @dilanSachi do we have this behavior in Kafka ?

* Since `AbandonOptions` and `DeferOptions`, allow anydata map, we can use an open record and for the `DeadLetterOptions`, we can define like below,
  ```ballerina
  # Options to specify when sending an `asb:Message` received via `asb:ReceiveMode#PEEK_LOCK` to the dead-letter queue.
  #
  # + deadLetterReason - The deadletter reason
  # + deadLetterErrorDescription - The deadletter error description
  public type DeadLetterOptions record {|
      string deadLetterReason?;
      string deadLetterErrorDescription?;
      anydata...
  |};
  ```

I think as you pointed out we can remove AbandonOptions and DeferOptions records. But, in DeadLetterOptions record we need to have the additional fields for deadLetterReason and deadLetterErrorDescription. So, we might need to keep the propertiesToModify field rather than opening the record for anydata. @daneshk wdyt ?

In the proposed asb:Caller API the defer method has the following signature,

public type Caller distinct client object {

    // other methods

    isolated remote function defer(*record {|anydata...;|} propertiesToModify) returns int|asb:Error;
};

This API was proposed to align with the asb:MessageReceiver functionality, following a similar API design. However, it appears that the underlying Java client does not return any value when the API is invoked. But, in the asb:MessageReceiver we are manually returning the sequenceNumber field which is available in the asb:Message.

Upon reviewing the Azure Service Bus documentation [1], it is clear that developers can retrieve a deferred message from an Azure Service Bus queue or topic using the sequenceNumber. In this case, since the developer already has access to the asb:Message, and therefore the sequenceNumber, there is no need to return any value other than asb:Error or ().

Consequently, our original design is not feasible. I believe the following API should be a more appropriate approach to implement this functionality.

public type Caller distinct client object {

    // other methods

    isolated remote function defer(*record {|anydata...;|} propertiesToModify) returns asb:Error?;
};

[1] - https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-deferral#retrieving-deferred-messages

@ayeshLK I agree with your points. I also dont see any specific reason to return the sequence number, given that the users can already access it from the message itself. And we should ideally apply the proposed behaviour to our existing receiver client to make it consistent with the listener's caller functionality, IMO.

Using autoComplete mode

When autoComplete mode is enabled, explicit acknowledgment or rejection of messages (ack/nack) by the caller is unnecessary. Therefore, if autoComplete is active and the developer defines the onMessage method with an asb:Caller parameter, a runtime error will be thrown. In the future, this should be validated during compilation by a compiler plugin.

asb:Caller is used not just for ack/nack functionality. it also enables users to mark messages as defer and move them to the dead letter queue. Therefore, if we restrict the use of asb:Caller usage based on whether autoComplete is enabled, users would be constrained to using defer and deadLetter functionalities only when autoComplete is disabled. Consequently, this validation should not be implemented, as it unnecessarily limits the functionality.

@daneshk / @NipunaRanasinghe please note.

asb:Caller is used not just for ack/nack functionality. it also enables users to mark messages as defer and move them to the dead letter queue. Therefore, if we restrict the use of asb:Caller usage based on whether autoComplete is enabled, users would be constrained to using defer and deadLetter functionalities only when autoComplete is disabled. Consequently, this validation should not be implemented, as it unnecessarily limits the functionality.

I revisited the behavior of the ASB client when attempting to mark a message as deferred or dead-lettered while the autoComplete mode is active. Both the defer and deadLetter actions change the message's state, leading to potential conflicts when combined with autoComplete mode. Therefore, it is advisable to avoid using manual deferral and dead-lettering when autoComplete is enabled. Thus, the earlier suggestion to restrict the use of asb:Caller with autoComplete mode turned on is indeed appropriate.

This has been implemented with this PR [1] and the proposal-doc was added to the repository with this PR [2]

[1] - ballerina-platform/module-ballerinax-azure-service-bus#220
[2] - ballerina-platform/module-ballerinax-azure-service-bus#222