该项目对rocketmq在实际项目开发中做了简单的封装,使用简单,common-rocketmq是基于JDK8的,下面是common-rocketmq结构:
包含5个类和一个XML配置文件
rocketmq---------common-------MessageData(消息传输DTO)
|
|---producer----------RocketMQProducer(生产者接口)
| |
| |---RocketMQProducerImpl(生产者接口实现)
|---consumer------RocketMQConsumer(消费者)
|
|---ConsumerService(消费者业务逻辑需要实现的接口)
common-rocketmq.xml
demo的github地址:https://github.com/xinyi-lt/common-rocketmq-demo
1.maven依赖
<dependency>
<groupId>com.sxzq.lt</groupId>
<artifactId>common-rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2.producer生产者
- 配置文件引入
<import resource="classpath*:conf/common-rocketmq.xml"/>
- 代码中注入producer
RocketMQProducer producer = applicationContext.getBean(RocketMQProducer.class);
- 发送消息
//消息传输DTO
MessageData<UserInfo> messageData = new MessageData<>();
//时间戳
messageData.setTimestamp(System.currentTimeMillis());
//幂等控制根据UUID来控制
messageData.setUuid(UUID.randomUUID().toString());
UserInfo userInfo = new UserInfo();
//useinfo数据
messageData.setData(userInfo);
logger.info("start to send message data:{}", JSON.toJSONString(messageData));
//发送消息
SendResult result = producer.sendMessage(
new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO,
MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST,
UUID.randomUUID().toString(),
JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8"))));
logger.info("send message complete result:{} ", result);
-
发送延时消息
//消息 Message msg = new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO, MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST, msgKey, JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8"))); //设置延迟消息级别 // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h msg.setDelayTimeLevel(4); //发送消息 SendResult result = producer.sendMessage(msg);
3.consumer消费者
-
配置文件引入
<import resource="classpath*:conf/common-rocketmq.xml"/>
-
实现common-rocketmq中的ConsumerService的接口
public class DemoConsumerService implements ConsumerService {
@Override
public boolean consume(Message message) {
//消费者业务逻辑
return consumeResult;
}
}
- DemoConsumerService添加到spring容器进行管理
<bean id="demoConsumerService" class="com.sxzq.lt.demo.consumer.DemoConsumerService"/>
- 创建该业务实例的消费者
<bean id="demoConsumer" class="com.sxzq.lt.rocketmq.consumer.RocketMQConsumer"
init-method="start" destroy-method="shutdown">
<!--consumerGroup -->
<constructor-arg value="MQ_CONSUMER_GROUP_DEMO_1" index="0"></constructor-arg>
<!-- topic -->
<constructor-arg value="MQ_TOPIC_PRODUCER_DEMO" index="1"></constructor-arg>
<!-- topic's subExpression -->
<constructor-arg value="MQ_TAG_PRODUCER_DEMO_1" index="2"></constructor-arg>
<property name="namesrvAddr" value="127.0.0.1:9876"></property>
<property name="consumeFromWhere" ref="CONSUME_FROM_FIRST_OFFSET"/>
<!--消费者业务逻辑实现-->
<property name="consumerHandler" ref="demoConsumerService"/>
<!--集群模式,默认为集群消费,不需要配置-->
<property name="messageModel" ref="CLUSTERING"/>
</bean>