SpringBoot集成RocketMQ消息中间件的Starter插件
<!-- 引入rocketmq-starter -->
<dependency>
<groupId>com.feizi.starter</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer-config.group=my_group
@Resource
private RocketMqSendTemplate rocketMqSendTemplate;
//封装MQ消息,可以指定topic主题,tag子主题,和key唯一识别码
Message message = MessageUtils.buildMessage("feizi_topic", "tagA", "key001", new User(1, "feizi", 25));
注意:上面的User
类定义在starter里面是为了方便测试,实际应用的时候直接定义在业务模块里面就可以了
//发送MQ消息
SendResult sendResult = rocketMqSendTemplate.syncSend(message, 3);
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MqProducerApplication implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerApplication.class);
/**
* MQ消息消费者topic定义
*/
private static final String MQ_TOPIC = "feizi_topic";
@Resource
private RocketMqSendTemplate rocketMqSendTemplate;
public static void main(String[] args) {
SpringApplication.run(MqProducerApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
LOGGER.info("================start生产者发送MQ消息");
/*############################first###############################*/
//封装MQ消息(first)
Message message1 = MessageUtils.buildMessage(MQ_TOPIC, "tagA", "key001", new User(1, "feizi", 25));
sendMessage(message1);
/*############################second###############################*/
//封装MQ消息(second)
Message message2 = MessageUtils.buildMessage(MQ_TOPIC, "tagB", "key002", new User(2, "hello", 26));
sendMessage(message2);
LOGGER.info("================end生产者发送MQ消息");
}
/*发送MQ消息*/
private void sendMessage(Message message){
/* 发送MQ消息, 如果失败,则尝试发送3次 */
SendResult sendResult = rocketMqSendTemplate.syncSend(message, 3);
if(null != sendResult){
/**
* TODO 发送结果,发送完毕执行业务操作
*/
LOGGER.info("================sendResult: " + sendResult);
}
}
}
server.port=8084
spring.application.name=mq-producer
spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer-config.group=my_group
<!-- 引入rocketmq-starter -->
<dependency>
<groupId>com.feizi.starter</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
spring.rocketmq.name-server=127.0.0.1:9876
@Component
public class MyConsumer1 implements RocketMqConsumerListener<Message>{
}
注意:需要配置@Component
注解,以便能被spring扫描到.
@Component
@RocketMqConsumer(topic = "feizi_topic", selectorExpress = "tagA", consumerGroup = "my_consumer_group1")
public class MyConsumer1 implements RocketMqConsumerListener<Message> {
}
@Component
@RocketMqConsumer(topic = "feizi_topic", selectorExpress = "tagA", consumerGroup = "my_consumer_group1")
public class MyConsumer1 implements RocketMqConsumerListener<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer1.class);
@Override
public boolean consume(Message message) {
LOGGER.info("==========================MyConsumer1 start=====================");
if(Objects.isNull(message)){
//接收到空消息,也表明此次消费成功
return true;
}
LOGGER.info("MyConsumer1 received message: {}, topic: {}, tags: {}, keys: {}",
message, message.getTopic(), message.getTags(), message.getKeys());
//取出消息体
byte[] messageBody = message.getBody();
if(Objects.isNull(messageBody)){
//接收到空消息,也表明此次消费成功
return true;
}
/* 将byte数组类型转字符串 */
String messageStr = StringUtils.byteArr2Str(messageBody);
MessageData<User> messageData = JsonUtils.jsonStr2Obj(messageStr, new TypeReference<MessageData<User>>(){});
LOGGER.info("幂等控制UUID: {}, 消息产生时间戳: {}",
messageData.getUuid(), messageData.getTimestamp());
//json字符串转Obj
User user = JsonUtils.jsonObj2Obj(messageData.getData(), new TypeReference<User>(){});
if(Objects.isNull(user)){
//接收到空消息,也表明此次消费成功
return true;
}
LOGGER.info("消息内容: {}", user);
/**
* TODO 具体的业务逻辑
*/
LOGGER.info("==========================MyConsumer1 end=====================");
return true;
}
}
注意:对于注解RocketMqConsumerListener的泛型参数T,可以指定为Message类型,也可以指定MessageData消息封装类类型,同时还可以直接指定String类型,建议统一指定为Message
类型,因为这样我们可以直接拿到topic
主题,tag
子主题和key
唯一识别码。
- 生产者端(发送消息端):
github地址: https://github.com/hu1991die/mq-producer
git clone地址: git@github.com:hu1991die/mq-producer.git
- 消费者端(接收消息端):
github地址: https://github.com/hu1991die/mq-consumer
git clone地址: git@github.com:hu1991die/mq-consumer.git