spring-boot-starter-request-reply

Description

This Spring Boot Starter provides request-reply functionality for Spring Cloud Stream Binders.

Spring Cloud Version Compatibility

Consult the table below to determine which version you need to use:

Spring Cloud spring-cloud-stream-starter-request-reply Spring Boot sol-jcsmp
2023.0.2 5.1.4 3.3.0 10.24.0
2023.0.2 5.1.3 3.3.0 10.23.0
2023.0.1 5.1.2 3.2.5 10.23.0

Usage

Dependencies

In order to be able to use the request/reply functionality, add the following section to your Maven pom:

<dependency>
    <groupId>community.solace.spring.cloud</groupId>
    <artifactId>spring-cloud-stream-starter-request-reply</artifactId>
    <version>5.1.4</version>
</dependency>

In order to use the starter in conjunction with a Binder, one will also need the respective dependency for that one.

Usage

For requester

If your want to send a request.
You have to define a topic pattern that match the topic where you send your requests to, to define the binding that should be used for these requests. With the binding your define the binder, contentType, ...
and the response address where the replier should send the response to.

spring.cloud.stream.requestreply.bindingMapping[n].binding have to:

  • match a entry in: spring.cloud.function.definition
  • match spring.cloud.stream.bindings.XX-in-0 where you defined the binder, contentType, ...

spring.cloud.stream.requestreply.bindingMapping[n].topicPatterns[m]:

  • Is a list of RegEx pattern that will match against the destination of your requests.
  • If there is no matching patterns when execute requestAndAwaitReplyToTopic()/requestReplyToTopic() and IllegalArgumentException will be thrown.
  • If you only want to use requestAndAwaitReplyToBinding()/requestReplyToBinding() you don`t need to give this configuration.

Please remember to define this binding as well in the spring.cloud.function.definition, otherwise you will not receive responses. But you not need to create a bean for that, this will be generated by the library.

Using dynamic topics
spring:
  cloud:
    function:
      definition: requestReplyRepliesDemo
    stream:
      requestreply:
        bindingMapping:
          - binding: requestReplyRepliesDemo
            replyTopic: requestReply/response/solace/{StagePlaceholder}/@project.artifactId@_${HOSTNAME}_${replyTopicWithWildcards|uuid}
            topicPatterns:
              - requestReply/request/.*
      bindings:
        requestReplyRepliesDemo-in-0:
          destination: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
          contentType: "application/json"
          binder: solace 
        requestReplyRepliesDemo-out-0:
          binder: solace
single response
        SensorReading response = requestReplyService.requestAndAwaitReplyToTopic(
                reading,
                "requestReply/request/last_value/temperature/celsius/" + location,
                SensorReading.class,
                Duration.ofSeconds(30)
        );
multiple responses

Introduction into Flux/project reactor

        Flux<SensorReading> responses = requestReplyService.requestAndAwaitReplyToTopicReactive(
                reading,
                "requestReply/request/last_value/temperature/celsius/" + location,
                SensorReading.class,
                Duration.ofSeconds(30)
        );
Using static topics
spring:
  cloud:
    function:
      definition: requestReplyRepliesDemo
    stream:
      requestreply:
        bindingMapping:
          - binding: requestReplyRepliesDemo
            replyTopic: requestReply/response/solace/{StagePlaceholder}/@project.artifactId@_${HOSTNAME}_${replyTopicWithWildcards|uuid}
      bindings:
        requestReplyRepliesDemo-in-0:
          destination: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
          contentType: "application/json"
          binder: solace 
        requestReplyRepliesDemo-out-0:
          destination: requestReply/request/last_value/temperature/celsius/livingroom,
          binder: solace
single response
        SensorReading response = requestReplyService.requestAndAwaitReplyToBinding(
                request,
                "requestReplyRepliesDemo",
                SensorReading.class,
                Duration.ofSeconds(30)
        );
multiple responses

Introduction into Flux/project reactor

        Flux<SensorReading> responses = requestReplyService.requestReplyToBindingReactive(
                request,
                "requestReplyRepliesDemo",
                SensorReading.class,
                Duration.ofSeconds(30)
        );

Full example

How everything is related

When using requestAndAwaitReplyToBinding / requestReplyToTopic The topic use in the code will be matched against all spring.cloud.stream.requestreply.bindingMapping[].topicPatterns,
the fist hit will be used. topic to pattern

The spring.cloud.stream.requestreply.bindingMapping[].binding of the matching section will be matched against spring.cloud.stream.bindings[] and always out-1 will be used.
This is required to let the requestReply service know what binder, contentType, ... is to use.
binding to -out-0

The spring.cloud.stream.requestreply.bindingMapping[].replyTopic of the matching section will be placed int the outbound message to let the foreign service know where you expect the answer.
In general, you want to use here a topic that is unique.
Best practice is to put:

  • HOSTNAME into the topic to make debugging a little easier.
  • put a UUID into the topic to have a unique inbox. You not want to use ${random.uuid} of spring here, because this would generate a new uuid each time it will be called. What is a problem at this point. Please prefer ${replyTopicWithWildcards|uuid} to have a static uuid that was generated on process start.
    reply topic sending

The requestReply service will iterate over spring.cloud.stream.requestreply.bindingMapping And create a Bean to consume messages received on -in-0.destination using the given binder.
consuming topic

As soon as you use the {StagePlaceholder} feature, you can not listen on:
requestReply/response/solace/{StagePlaceholder}/pub_sub_sending_K353456_315fd96b-b981-417b-be99-3be065c6611d
because the foreign side will replace {StagePlaceholder} against p-arcsfor example before responding.
There for you need to listen on: requestReply/response/solace/*/pub_sub_sending_K353456_315fd96b-b981-417b-be99-3be065c6611d
This task will be done by: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
It will take spring.cloud.stream.requestreply.bindingMapping[].replyTopic of the section, matching the first parameter.
And replace all {someThing} to the wildcard (second parameter) *
reply topic replace wildcard

For replier

In general, if you want to answer to a message, you do not need this library. Instead, you can simply send the response to the topic defined in the reply to header and reply all requested header.

However, the methods RequestReplyMessageHeaderSupportService.wrap, RequestReplyMessageHeaderSupportService.wrapList and RequestReplyMessageHeaderSupportService.wrapFlux from this library can support in creating the response with properly setting the message headers, as well as with substituting variables in dynamic topics.

By default, the wrapping function will set the correlationId and reply destination headers of the message. In case of multi responses the header for totalReplies and replyIndex will be set as well. Additional headers can be configured to be copied from the request by setting spring.cloud.stream.requestReply.copyHeadersOnWrap accordingly, e.g.:

spring.cloud.stream.requestReply.copyHeadersOnWrap=encoding,yetAnotherHeader

The actual Wrapping can be used as in the example below:

single response
public class PingPongConfig {
  @Bean
  public Function<Message<SensorRequest>, Message<SensorReading>> responseToRequest(
          RequestReplyMessageHeaderSupportService headerSupport
  ) {
    return headerSupport.wrap((request) -> {
      SensorReading response = new SensorReading();
      ...

      return response;
    });
  }
}

Full example

multiple responses, functional
public class PingPongConfig {
  @Bean
  public Function<Message<SensorRequest>, List<Message<SensorReading>>> responseMultiToRequestKnownSizeSolace(
          RequestReplyMessageHeaderSupportService headerSupport
  ) {
    return headerSupport.wrapList((request) -> {
      List<SensorReading> responses = new ArrayList<>();
      ...
  
      return responses;
    }, "responseMultiToRequestKnownSizeSolace-out-0");
  }
}

Full example

multiple responses, reactive
public class PingPongConfig {
  @Bean
  public Function<Flux<Message<SensorRequest>>, Flux<Message<SensorReading>>> responseMultiToRequestRandomSizeSolace(
          RequestReplyMessageHeaderSupportService headerSupport
  ) {
    return headerSupport.wrapFlux((request, responseSink) -> {
      try {
        while (...) {
          responseSink.next(response);
        }
        responseSink.complete();
      } catch (Exception e) {
        responseSink.error(new IllegalArgumentException("Business error message", e));
      }
    }, "responseMultiToRequestRandomSizeSolace-out-0");
  }
}

Full example

error handling

You might want to forward errors to requester. To forward errors you only need to define 1 to N exception classes that should be forwarded to the requestor.

public class PingPongConfig {
  @Bean
  public Function<Message<SensorRequest>, Message<SensorReading>> responseToRequest(
          RequestReplyMessageHeaderSupportService headerSupport
  ) {
    return headerSupport.wrap((request) -> {
      SensorReading response = new SensorReading();
      ...

      return response;
    }, MyBusinessException.class, SomeOtherException.class);
  }
}

Input class validation and json parsing can not be returned to the requester out of the box. There for you need to do the validation on your own like:

public class PingPongConfig {
  @Qualifier("mvcValidator")
  private final Validator validator;

  private final ObjectMapper objectMapper;
    
  @Bean
  public Function<Message<String>, Message<SensorReading>> responseToRequest(
          RequestReplyMessageHeaderSupportService headerSupport
  ) {
    return headerSupport.wrap((rawRequest) -> {
      SensorReading request = objectMapper.readValue(rawRequest.getPayload(), SensorReading.class);
      final DataBinder db = new DataBinder(request);
      db.setValidator(validator);
      db.validate();
      
        
      SensorReading response = new SensorReading();
      ...

      return response;
    }, MyBusinessException.class, SomeOtherException.class);
  }
}
Variable replacement

There is a need to replace wildcards in reply topics. For example: "{StagePlaceholder}" need to be replaced by the replier stage. To let the requester know who answered. That will be helpful when debugging how messages are flowing.

Example:

spring:
  cloud:
    function:
      definition: requestReplyRepliesDemo
    stream:
      requestreply:
        variableReplacements:
          "{StagePlaceholder}": ${RCS_ENV_ROLE}-${RCS_CLUSTER}

Those variableReplacements will be applied to request and reply topics.

API

RequestReplyService

The request/reply functionality provided by this starter can be utilized by autowiring the RequestReplyService which offers the following methods:

  • A requestAndAwaitReplyToTopic(Q request, String requestDestination, Class<A> expectedResponseClass, Duration timeoutPeriod) sends the given request to the given request destination, awaits the response and maps it to the provided class as a return value. If there is a -out-0.destination configured, it will simply be ignored.
  • A requestAndAwaitReplyToBinding(Q request, String bindingName, Class<A> expectedResponseClass, Duration timeoutPeriod) sends the given request to the destinationconfigured for the -out-0of this binding, awaits the response and maps it to the provided class as a return value. If there is a -out-0.destination configured, it will simply be ignored.
  • CompletableFuture<A> requestReplyToTopic(Q request, String requestDestination, Class<A> expectedClass, Duration timeoutPeriod) sends the given request to the given request destination, returns a future and maps it to the provided class when responses received. Use this only if you have the rare edge case that you need to run multiple request reply in parallel in the same thread.
  • CompletableFuture<A> requestReplyToBinding(Q request, String bindingName, Class<A> expectedClass, Duration timeoutPeriod) sends the given request to the destinationconfigured for the -out-0of this binding, returns a future and maps it to the provided class when responses received. Use this only if you have the rare edge case that you need to run multiple request reply in parallel in the same thread.
  • Flux<A> requestReplyToTopicReactive(Q request, String requestDestination, Class<A> expectedClass, Duration timeoutPeriod) sends the given request to the given request destination, returns a flux and maps it to the provided class when responses received. Use this if you expect multiple responses for a single questions. For example: if your response type would be an array it is best practice to send those as single messages to avoid to big messages.
  • Flux<A> requestReplyToBindingReactive(Q request, String bindingName, Class<A> expectedClass, Duration timeoutPeriod) sends the given request to the destinationconfigured for the -out-0of this binding, returns a future and maps it to the provided class when responses received. Use this if you expect multiple responses for a single questions. For example: if your response type would be an array it is best practice to send those as single messages to avoid to big messages.

All methods can be used in any combination.

Request with responses

When you use the requestReplyToTopicReactive methode, you are enable to receive multiple responses to your question.

Example for blocking

A blocking request reply where you receive a list of answers.

    @GetMapping(value = "/temperature/last_hour/{location}")
    public List<SensorReading> requestMultiReplySample(
            @PathVariable("location") final String location
    ) {
        MyRequest request = new MyRequest();
        request.setLocation(location);

        return requestReplyService.requestReplyToTopicReactive(
                        request,
                        "last_hour/temperature/celsius/" + location,
                        SensorReading.class,
                        Duration.ofSeconds(30)
                )
                .collectList()
                .block();
    }
Example for non-blocking

A blocking request reply where you receive a list of answers.

    @GetMapping(value = "/temperature/last_hour/{location}")
    public void requestMultiReplySample(
            @PathVariable("location") final String location
    ) {
        MyRequest request = new MyRequest();
        request.setLocation(location);

        requestReplyService.requestReplyToTopicReactive(
                        request,
                        "last_hour/temperature/celsius/" + location,
                        SensorReading.class,
                        Duration.ofSeconds(30)
                )
                .subscribe(
                        sensorReading -> log.info("Got an answr: " + sensorReading),
                        throwable -> log.error("The request was finished with error", throwable),
                        () -> log.info("The request was finished")
                )
        );
    }
receive many answers

If your request is a org.springframework.messaging.Message, you can decide if every response should be transported as a single message or if a couple of messages should be grouped together.

Message grouping can be enabled via: requestMsg.setHeader(SpringHeaderParser.GROUPED_MESSAGES, true);

In case you request something else, will the request/reply lib add GROUPED_MESSAGES=true automatically.

Until you require a separate header for each reply, it is recommended to use grouped messages instead of sending individual messages.
Grouped messages improve reply speed by reducing the message header overhead and conserving broker resources.

Messages will group together until, one of following rules is true:

  • The grouped message exceed the 1MB limit
  • The group contains more than 10_000 single items
  • The first messages in the group is more than 0.5sec ago (or the replier configured a different value)

RequestReplyMessageHeaderSupportService

In case a service is only responding to requests, the library can still be used as it offers receiving services helper methods to wrap response functions by means of the RequestReplyMessageHeaderSupportService, which properly sets the Message's destination header, which then can be picked up through Spring Cloud Functions. e.g.:

    @Bean
    @Autowired
    public Function<Message<String>, Message<String>> reverse(RequestReplyMessageHeaderSupportService headerSupport) {
        return headerSupport.wrap((value) -> new StringBuilder(value).reverse().toString());
    }

Extensibility

The Request/Reply Spring Boot Starter has been designed to work with both the Solace Binder and the TestSupportBinder, but can be extended to work with other binders as well.

For that the following beans must be provided to be able to adopt to another binder.

Message and MessageHeader parsers

When receiving messages, the application must be able to determine correlationId, destination and replyTo properties attached to a message. Unless a particular binder adheres to spring messaging standards or requires differing headers for performance optimizations, additional message parsers (or related message header parsers) must be present. For proper prioritization the bean should be annotated with the @Order annotation. (see @Order in Spring @Baeldung)

The Starter includes the following message parser interfaces:

  • MessageCorrelationIdParser - root interface for parsing the correlation id from an incoming message
    • MessageHeaderCorrelationIdParser - extended interface for parsing the correlation id from an incoming message's MessageHeaders
  • MessageDestinationParser - root interface for parsing the destination from an incoming message
    • MessageHeaderDestinationParser- extended interface for parsing the destination from an incoming message's MessageHeaders
  • MessageReplyToParser - root interface for parsing the reply destination from an incoming message
    • MessageHeaderReplyToParser - extended interface for parsing the reply destination from an incoming message's MessageHeaders
  • MessageTotalRepliesParser - root interface for parsing the total replies for from an incoming multi response message
    • MessageHeaderTotalRepliesParser - extended interface for parsing the total replies from an incoming multi response message's MessageHeaders

The starter also includes the following message parser implementations:

  • SolaceHeaderParser Order: 200 implements MessageHeaderCorrelationIdParser, MessageHeaderDestinationParser, MessageHeaderReplyToParser for the Solace binder
  • SpringCloudStreamHeaderParser Order 10000 implements MessageHeaderDestinationParser, MessageTotalRepliesParser for standard Spring Cloud Stream headers
  • SpringIntegrationHeaderParser Order 20000 implements MessageHeaderCorrelationIdParser for standard Spring Integration headers
  • BinderHeaderParser Order 30000 implements MessageHeaderDestinationParser for standard Spring Cloud Stream Binder headers
  • SpringHeaderParser Order 40000 implements MessageHeaderReplyToParser for Spring Framework message header standards
  • HttpHeaderParser Order LOWEST_PRECEDENCE implements MessageHeaderCorrelationIdParser according to HTTP header standard

Known issues and Open Points

Statefulness

Since message relations are kept in memory, this starter is neither fail-safe nor scalable.

More precisely:

  • If one instance of the service sends a message and another one receives the response, these can not be related.
  • If a service dies, any relations are forgotten and replies can no longer be related to request, potentially resulting in message loss.

External Links

Compatibility

Tested with:

SpringBoot SpringCloudStream
2.6.6 2021.0.1
2.6.6 2021.0.3
3.2.5 2023.0.1