This Spring Boot Starter provides request-reply functionality for Spring Cloud Stream Binders.
Consult the table below to determine which version you need to use:
Spring Cloud | spring-cloud-stream-starter-request-reply | Spring Boot | sol-jcsmp |
---|---|---|---|
2023.0.2 | 5.1.4 | 3.3.0 | 10.24.0 |
2023.0.2 | 5.1.3 | 3.3.0 | 10.23.0 |
2023.0.1 | 5.1.2 | 3.2.5 | 10.23.0 |
In order to be able to use the request/reply functionality, add the following section to your Maven pom:
<dependency>
<groupId>community.solace.spring.cloud</groupId>
<artifactId>spring-cloud-stream-starter-request-reply</artifactId>
<version>5.1.4</version>
</dependency>
In order to use the starter in conjunction with a Binder, one will also need the respective dependency for that one.
If your want to send a request.
You have to define a topic pattern that match the topic where you send your requests to,
to define the binding that should be used for these requests.
With the binding your define the binder, contentType, ...
and the response address where the replier should send the response to.
spring.cloud.stream.requestreply.bindingMapping[n].binding
have to:
- match a entry in:
spring.cloud.function.definition
- match
spring.cloud.stream.bindings.XX-in-0
where you defined the binder, contentType, ...
spring.cloud.stream.requestreply.bindingMapping[n].topicPatterns[m]
:
- Is a list of RegEx pattern that will match against the destination of your requests.
- If there is no matching patterns when execute
requestAndAwaitReplyToTopic()
/requestReplyToTopic()
andIllegalArgumentException
will be thrown. - If you only want to use
requestAndAwaitReplyToBinding()
/requestReplyToBinding()
you don`t need to give this configuration.
Please remember to define this binding as well in the spring.cloud.function.definition
,
otherwise you will not receive responses.
But you not need to create a bean for that, this will be generated by the library.
spring:
cloud:
function:
definition: requestReplyRepliesDemo
stream:
requestreply:
bindingMapping:
- binding: requestReplyRepliesDemo
replyTopic: requestReply/response/solace/{StagePlaceholder}/@project.artifactId@_${HOSTNAME}_${replyTopicWithWildcards|uuid}
topicPatterns:
- requestReply/request/.*
bindings:
requestReplyRepliesDemo-in-0:
destination: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
contentType: "application/json"
binder: solace
requestReplyRepliesDemo-out-0:
binder: solace
SensorReading response = requestReplyService.requestAndAwaitReplyToTopic(
reading,
"requestReply/request/last_value/temperature/celsius/" + location,
SensorReading.class,
Duration.ofSeconds(30)
);
Introduction into Flux/project reactor
Flux<SensorReading> responses = requestReplyService.requestAndAwaitReplyToTopicReactive(
reading,
"requestReply/request/last_value/temperature/celsius/" + location,
SensorReading.class,
Duration.ofSeconds(30)
);
spring:
cloud:
function:
definition: requestReplyRepliesDemo
stream:
requestreply:
bindingMapping:
- binding: requestReplyRepliesDemo
replyTopic: requestReply/response/solace/{StagePlaceholder}/@project.artifactId@_${HOSTNAME}_${replyTopicWithWildcards|uuid}
bindings:
requestReplyRepliesDemo-in-0:
destination: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
contentType: "application/json"
binder: solace
requestReplyRepliesDemo-out-0:
destination: requestReply/request/last_value/temperature/celsius/livingroom,
binder: solace
SensorReading response = requestReplyService.requestAndAwaitReplyToBinding(
request,
"requestReplyRepliesDemo",
SensorReading.class,
Duration.ofSeconds(30)
);
Introduction into Flux/project reactor
Flux<SensorReading> responses = requestReplyService.requestReplyToBindingReactive(
request,
"requestReplyRepliesDemo",
SensorReading.class,
Duration.ofSeconds(30)
);
When using requestAndAwaitReplyToBinding
/ requestReplyToTopic
The topic use in the code will be matched against all spring.cloud.stream.requestreply.bindingMapping[].topicPatterns
,
the fist hit will be used.
The spring.cloud.stream.requestreply.bindingMapping[].binding
of the matching section will be matched against
spring.cloud.stream.bindings[]
and always out-1 will be used.
This is required to let the requestReply service know what binder
, contentType
, ... is to use.
The spring.cloud.stream.requestreply.bindingMapping[].replyTopic
of the matching section will be placed int the outbound
message to let the foreign service know where you expect the answer.
In general, you want to use here a topic that is unique.
Best practice is to put:
- HOSTNAME into the topic to make debugging a little easier.
- put a UUID into the topic to have a unique inbox.
You not want to use
${random.uuid}
of spring here, because this would generate a new uuid each time it will be called. What is a problem at this point. Please prefer${replyTopicWithWildcards|uuid}
to have a static uuid that was generated on process start.
The requestReply service will iterate over spring.cloud.stream.requestreply.bindingMapping
And create a Bean to consume messages received on -in-0.destination
using the given binder.
As soon as you use the {StagePlaceholder}
feature, you can not listen on:
requestReply/response/solace/{StagePlaceholder}/pub_sub_sending_K353456_315fd96b-b981-417b-be99-3be065c6611d
because the foreign side will replace {StagePlaceholder}
against p-arcs
for example before responding.
There for you need to listen on:
requestReply/response/solace/*/pub_sub_sending_K353456_315fd96b-b981-417b-be99-3be065c6611d
This task will be done by: ${replyTopicWithWildcards|requestReplyRepliesDemo|*}
It will take spring.cloud.stream.requestreply.bindingMapping[].replyTopic
of the section, matching the first parameter.
And replace all {someThing}
to the wildcard (second parameter) *
In general, if you want to answer to a message, you do not need this library. Instead, you can simply send the response to the topic defined in the reply to header and reply all requested header.
However, the methods RequestReplyMessageHeaderSupportService.wrap
, RequestReplyMessageHeaderSupportService.wrapList
and RequestReplyMessageHeaderSupportService.wrapFlux
from this library can support in creating the response with properly setting the message headers,
as well as with substituting variables in dynamic topics.
By default, the wrapping function will set the correlationId and reply destination headers of the message.
In case of multi responses the header for totalReplies and replyIndex will be set as well.
Additional headers can be configured to be copied from the request by
setting spring.cloud.stream.requestReply.copyHeadersOnWrap
accordingly, e.g.:
spring.cloud.stream.requestReply.copyHeadersOnWrap=encoding,yetAnotherHeader
The actual Wrapping can be used as in the example below:
public class PingPongConfig {
@Bean
public Function<Message<SensorRequest>, Message<SensorReading>> responseToRequest(
RequestReplyMessageHeaderSupportService headerSupport
) {
return headerSupport.wrap((request) -> {
SensorReading response = new SensorReading();
...
return response;
});
}
}
public class PingPongConfig {
@Bean
public Function<Message<SensorRequest>, List<Message<SensorReading>>> responseMultiToRequestKnownSizeSolace(
RequestReplyMessageHeaderSupportService headerSupport
) {
return headerSupport.wrapList((request) -> {
List<SensorReading> responses = new ArrayList<>();
...
return responses;
}, "responseMultiToRequestKnownSizeSolace-out-0");
}
}
public class PingPongConfig {
@Bean
public Function<Flux<Message<SensorRequest>>, Flux<Message<SensorReading>>> responseMultiToRequestRandomSizeSolace(
RequestReplyMessageHeaderSupportService headerSupport
) {
return headerSupport.wrapFlux((request, responseSink) -> {
try {
while (...) {
responseSink.next(response);
}
responseSink.complete();
} catch (Exception e) {
responseSink.error(new IllegalArgumentException("Business error message", e));
}
}, "responseMultiToRequestRandomSizeSolace-out-0");
}
}
You might want to forward errors to requester. To forward errors you only need to define 1 to N exception classes that should be forwarded to the requestor.
public class PingPongConfig {
@Bean
public Function<Message<SensorRequest>, Message<SensorReading>> responseToRequest(
RequestReplyMessageHeaderSupportService headerSupport
) {
return headerSupport.wrap((request) -> {
SensorReading response = new SensorReading();
...
return response;
}, MyBusinessException.class, SomeOtherException.class);
}
}
Input class validation and json parsing can not be returned to the requester out of the box. There for you need to do the validation on your own like:
public class PingPongConfig {
@Qualifier("mvcValidator")
private final Validator validator;
private final ObjectMapper objectMapper;
@Bean
public Function<Message<String>, Message<SensorReading>> responseToRequest(
RequestReplyMessageHeaderSupportService headerSupport
) {
return headerSupport.wrap((rawRequest) -> {
SensorReading request = objectMapper.readValue(rawRequest.getPayload(), SensorReading.class);
final DataBinder db = new DataBinder(request);
db.setValidator(validator);
db.validate();
SensorReading response = new SensorReading();
...
return response;
}, MyBusinessException.class, SomeOtherException.class);
}
}
There is a need to replace wildcards in reply topics. For example: "{StagePlaceholder}" need to be replaced by the replier stage. To let the requester know who answered. That will be helpful when debugging how messages are flowing.
Example:
spring:
cloud:
function:
definition: requestReplyRepliesDemo
stream:
requestreply:
variableReplacements:
"{StagePlaceholder}": ${RCS_ENV_ROLE}-${RCS_CLUSTER}
Those variableReplacements
will be applied to request and reply topics.
The request/reply functionality provided by this starter can be utilized by autowiring the RequestReplyService
which offers the following methods:
A requestAndAwaitReplyToTopic(Q request, String requestDestination, Class<A> expectedResponseClass, Duration timeoutPeriod)
sends the given request to the given request destination, awaits the response and maps it to the provided class as a return value. If there is a-out-0.destination
configured, it will simply be ignored.A requestAndAwaitReplyToBinding(Q request, String bindingName, Class<A> expectedResponseClass, Duration timeoutPeriod)
sends the given request to thedestination
configured for the-out-0
of this binding, awaits the response and maps it to the provided class as a return value. If there is a-out-0.destination
configured, it will simply be ignored.CompletableFuture<A> requestReplyToTopic(Q request, String requestDestination, Class<A> expectedClass, Duration timeoutPeriod)
sends the given request to the given request destination, returns a future and maps it to the provided class when responses received. Use this only if you have the rare edge case that you need to run multiple request reply in parallel in the same thread.CompletableFuture<A> requestReplyToBinding(Q request, String bindingName, Class<A> expectedClass, Duration timeoutPeriod)
sends the given request to thedestination
configured for the-out-0
of this binding, returns a future and maps it to the provided class when responses received. Use this only if you have the rare edge case that you need to run multiple request reply in parallel in the same thread.Flux<A> requestReplyToTopicReactive(Q request, String requestDestination, Class<A> expectedClass, Duration timeoutPeriod)
sends the given request to the given request destination, returns a flux and maps it to the provided class when responses received. Use this if you expect multiple responses for a single questions. For example: if your response type would be an array it is best practice to send those as single messages to avoid to big messages.Flux<A> requestReplyToBindingReactive(Q request, String bindingName, Class<A> expectedClass, Duration timeoutPeriod)
sends the given request to thedestination
configured for the-out-0
of this binding, returns a future and maps it to the provided class when responses received. Use this if you expect multiple responses for a single questions. For example: if your response type would be an array it is best practice to send those as single messages to avoid to big messages.
All methods can be used in any combination.
When you use the requestReplyToTopicReactive
methode, you are enable to receive multiple responses to your question.
A blocking request reply where you receive a list of answers.
@GetMapping(value = "/temperature/last_hour/{location}")
public List<SensorReading> requestMultiReplySample(
@PathVariable("location") final String location
) {
MyRequest request = new MyRequest();
request.setLocation(location);
return requestReplyService.requestReplyToTopicReactive(
request,
"last_hour/temperature/celsius/" + location,
SensorReading.class,
Duration.ofSeconds(30)
)
.collectList()
.block();
}
A blocking request reply where you receive a list of answers.
@GetMapping(value = "/temperature/last_hour/{location}")
public void requestMultiReplySample(
@PathVariable("location") final String location
) {
MyRequest request = new MyRequest();
request.setLocation(location);
requestReplyService.requestReplyToTopicReactive(
request,
"last_hour/temperature/celsius/" + location,
SensorReading.class,
Duration.ofSeconds(30)
)
.subscribe(
sensorReading -> log.info("Got an answr: " + sensorReading),
throwable -> log.error("The request was finished with error", throwable),
() -> log.info("The request was finished")
)
);
}
If your request is a org.springframework.messaging.Message
, you can decide if every response should be
transported as a single message or if a couple of messages should be grouped together.
Message grouping can be enabled via:
requestMsg.setHeader(SpringHeaderParser.GROUPED_MESSAGES, true);
In case you request something else, will the request/reply lib add GROUPED_MESSAGES=true
automatically.
Until you require a separate header for each reply,
it is recommended to use grouped messages instead of sending individual messages.
Grouped messages improve reply speed by reducing the message header overhead and conserving broker resources.
Messages will group together until, one of following rules is true:
- The grouped message exceed the 1MB limit
- The group contains more than 10_000 single items
- The first messages in the group is more than 0.5sec ago (or the replier configured a different value)
In case a service is only responding to requests, the library can still be used as it offers receiving services helper methods to wrap response functions by means of the RequestReplyMessageHeaderSupportService
, which properly sets the Message's destination header, which then can be picked up through Spring Cloud Functions. e.g.:
@Bean
@Autowired
public Function<Message<String>, Message<String>> reverse(RequestReplyMessageHeaderSupportService headerSupport) {
return headerSupport.wrap((value) -> new StringBuilder(value).reverse().toString());
}
The Request/Reply Spring Boot Starter has been designed to work with both the Solace Binder and the TestSupportBinder, but can be extended to work with other binders as well.
For that the following beans must be provided to be able to adopt to another binder.
When receiving messages, the application must be able to determine correlationId, destination and replyTo properties attached to a message. Unless a particular binder adheres to spring messaging standards or requires differing headers for performance optimizations, additional message parsers (or related message header parsers) must be present. For proper prioritization the bean should be annotated with the @Order
annotation. (see @Order in Spring @Baeldung)
The Starter includes the following message parser interfaces:
MessageCorrelationIdParser
- root interface for parsing the correlation id from an incoming messageMessageHeaderCorrelationIdParser
- extended interface for parsing the correlation id from an incoming message's MessageHeaders
MessageDestinationParser
- root interface for parsing the destination from an incoming messageMessageHeaderDestinationParser
- extended interface for parsing the destination from an incoming message's MessageHeaders
MessageReplyToParser
- root interface for parsing the reply destination from an incoming messageMessageHeaderReplyToParser
- extended interface for parsing the reply destination from an incoming message's MessageHeaders
MessageTotalRepliesParser
- root interface for parsing the total replies for from an incoming multi response messageMessageHeaderTotalRepliesParser
- extended interface for parsing the total replies from an incoming multi response message's MessageHeaders
The starter also includes the following message parser implementations:
SolaceHeaderParser
Order: 200 implementsMessageHeaderCorrelationIdParser
,MessageHeaderDestinationParser
,MessageHeaderReplyToParser
for the Solace binderSpringCloudStreamHeaderParser
Order 10000 implementsMessageHeaderDestinationParser
,MessageTotalRepliesParser
for standard Spring Cloud Stream headersSpringIntegrationHeaderParser
Order 20000 implementsMessageHeaderCorrelationIdParser
for standard Spring Integration headersBinderHeaderParser
Order 30000 implementsMessageHeaderDestinationParser
for standard Spring Cloud Stream Binder headersSpringHeaderParser
Order 40000 implementsMessageHeaderReplyToParser
for Spring Framework message header standardsHttpHeaderParser
Order LOWEST_PRECEDENCE implementsMessageHeaderCorrelationIdParser
according to HTTP header standard
Since message relations are kept in memory, this starter is neither fail-safe nor scalable.
More precisely:
- If one instance of the service sends a message and another one receives the response, these can not be related.
- If a service dies, any relations are forgotten and replies can no longer be related to request, potentially resulting in message loss.
Tested with:
SpringBoot | SpringCloudStream |
---|---|
2.6.6 | 2021.0.1 |
2.6.6 | 2021.0.3 |
3.2.5 | 2023.0.1 |