AliwareMQ/mq-demo

1.8.4.Final~2.0.2.Final消费者注册无效

Closed this issue · 3 comments

@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {

    private final HomeCardMapper homeCardMapper;

    private final MsgRecordService msgRecordService;

    private final AliRocketMQProperties aliRocketMQProperties;

    @Override
    @Transactional
    public Action consume(Message message, ConsumeContext context) {

        log.info("callback msg -> {}", message);

        try {
            String callbackContent = new String(message.getBody());
            CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);

            //更新发送记录的回执
            MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
            msgRecord.setCallback(callbackContent);
            msgRecord.setUpdateTime(new Date());
            msgRecordService.updateById(msgRecord);

            //如果回调成功且是创建卡片,更新卡片创建状态
            if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
                Long planId = Long.valueOf(request.getReportId());
                HomeCard homeCard = homeCardMapper.selectById(planId);
                homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
                homeCard.setUpdateTime(new Date());
                homeCardMapper.updateById(homeCard);
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("consume failed ->", e);
            return Action.ReconsumeLater;
        }
    }
}```
```java
    @PostConstruct
    public void startConsumer() {
        Properties properties = mqProperties.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(mqProperties.getTopic(), mqProperties.getCallbackTag(),
                new HomeCardCallbackListener(homeCardMapper, msgRecordService, mqProperties));
        consumer.start();
    }

或者设置listener为bean

@Component
@AllArgsConstructor
@Slf4j
public class HomeCardCallbackListener implements MessageListener {

    private final HomeCardMapper homeCardMapper;

    private final MsgRecordService msgRecordService;

    private final AliRocketMQProperties aliRocketMQProperties;

    @Override
    @Transactional
    public Action consume(Message message, ConsumeContext context) {

        log.info("callback msg -> {}", message);

        try {
            String callbackContent = new String(message.getBody());
            CardCallbackRequestDTO request = JSON.parseObject(callbackContent, CardCallbackRequestDTO.class);

            //更新发送记录的回执
            MsgRecord msgRecord = msgRecordService.findByMessageId(request.getMessageId());
            msgRecord.setCallback(callbackContent);
            msgRecord.setUpdateTime(new Date());
            msgRecordService.updateById(msgRecord);

            //如果回调成功且是创建卡片,更新卡片创建状态
            if (Objects.equals(0, request.getCode()) && Objects.equals(request.getSendTag(), aliRocketMQProperties.getCreateTag())) {
                Long planId = Long.valueOf(request.getReportId());
                HomeCard homeCard = homeCardMapper.selectById(planId);
                homeCard.setStatus(HomeCardStatusEnum.OK.getValue());
                homeCard.setUpdateTime(new Date());
                homeCardMapper.updateById(homeCard);
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("consume failed ->", e);
            return Action.ReconsumeLater;
        }
    }
}
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer(AliRocketMQProperties mqProperties, HomeCardCallbackListener listener) {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqProperties.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqProperties.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqProperties.getTopic());
        subscription.setExpression(mqProperties.getCallbackTag());
        subscriptionTable.put(subscription, listener);
        //订阅多个topic如上面设置
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

均不可以正常注册消费者,且没有报错日志,生产者是正常的

@PostConstruct@bean均是在@configuration下的配置类中的

springboot版本2.4.3

已找到原因,没有在阿里云控制台中创建groupId导致