/mq-to-future

将kafka消息转换为CompletableFuture或Mono

Primary LanguageJava

Message Queue消息转换至CompleteFuture的实践

在一些场景下,我们希望消费者消费消息后,异步处理完毕后,能发送回执到生产者。例如,在物联网场景下,服务端发送一条命令到设备,设备执行完毕,需要将执行结果返回给服务端。这时候,我们需要有一种机制来将消息队列转换为异步函数,简化开发工作。

设计

我们使用trace_id来跟踪整个请求到响应的链路,以此识别对应的生产/消费者。大致流程如下图

需要注意的是:

  • Consumer获取到消息后,需要将TraceID取出,并在发送回执消息的时候带上此TraceId
  • Worker需要设计超时机制,因为异步消息总会存在消息丢失的情况,未在指定时间内收到回执消息,需要向请求方发送Future<Timerout>类似的返回值以回应。