/rocketmq-spring-boot-starter

Spring Boot starter for RocketMQ

Primary LanguageJavaApache License 2.0Apache-2.0

spring boot starter for RocketMQ Build Status Coverage Status

Maven CentralGitHub release

项目介绍

Rocketmq 是由阿里巴巴团队开发并捐赠给apache团队的优秀消息中间件,承受过历年双十一大促的考验。

你可以通过本项目轻松的集成Rocketmq到你的SpringBoot项目中。 本项目主要包含以下特性

  • 同步发送消息
  • 异步发送消息
  • 广播发送消息
  • 有序发送和消费消息
  • 发送延时消息
  • 消息tag和key支持
  • 自动序列化和反序列化消息体
  • 消息的实际消费方IP追溯
  • 发送事务消息(NEW)
  • ...
  • 发送即忘消息(可能由于直接抛弃所有异常导致消息静默丢失,弃用)
  • 拉取方式消费(配置方式复杂,位点可能发生偏移,弃用)

简单入门实例

1. 添加maven依赖:
<dependency>
    <groupId>com.maihaoche</groupId>
    <artifactId>spring-boot-starter-rocketmq</artifactId>
    <version>0.1.0</version>
</dependency>
2. 添加配置:
spring:
    rocketmq:
      name-server-address: 172.21.10.111:9876
      # 可选, 如果无需发送消息则忽略该配置
      producer-group: local_pufang_producer
      # 发送超时配置毫秒数, 可选, 默认3000
      send-msg-timeout: 5000
      # 追溯消息具体消费情况的开关,默认打开
      #trace-enabled: false
      # 是否启用VIP通道,默认打开
      #vip-channel-enabled: false
3. 程序入口添加注解开启自动装配

在springboot应用主入口添加@EnableMQConfiguration注解开启自动装配:

@SpringBootApplication
@EnableMQConfiguration
class DemoApplication {
}
4. 构建消息体

通过我们提供的Builder类创建消息对象,详见wiki

MessageBuilder.of(new MSG_POJO()).topic("some-msg-topic").build();
5. 创建发送方

详见wiki

@MQProducer
public class DemoProducer extends AbstractMQProducer{
}
6. 创建消费方

详见wiki支持springEL风格配置项解析,如存在suclogger-test-cluster配置项,会优先将topic解析为配置项对应的值。

@MQConsumer(topic = "${suclogger-test-cluster}", consumerGroup = "local_sucloger_dev")
public class DemoConsumer extends AbstractMQPushConsumer {

    @Override
    public boolean process(Object message, Map extMap) {
        // extMap 中包含messageExt中的属性和message.properties中的属性
        System.out.println(message);
        return true;
    }
}
7. 发送消息:
// 注入发送者
@Autowired
private DemoProducer demoProducer;
    
...
    
// 发送
demoProducer.syncSend(msg)
    

发送事务消息###

Since 0.1.0

5.1 事务消息发送方#####
@MQTransactionProducer(producerGroup = "${camaro.mq.transactionProducerGroup}")
public class DemoTransactionProducer extends AbstractMQTransactionProducer {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // executeLocalTransaction
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // LocalTransactionState.ROLLBACK_MESSAGE
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}