rmq demo

本地docker 启动。MQ简单逻辑后续其实可以做在FaaS中,支持弹性扩缩容。

deploy

demos

* enum org.apache.rocketmq.common.consumer.ConsumeFromWhere
    CONSUME_FROM_LAST_OFFSET: 从最后的偏移量开始消费
    CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
    CONSUME_FROM_TIMESTAMP:从某个时间开始消费 
* 控制台 -> topic -> 重置消费位点 -> 设置 订阅组 和 时间点 -> 提交即可
三个服务: 商品服务、订单服务、用户积分服务。
用户创单后,就算商品库存没有正常扣减、用户积分没有对应增加 也不管了,保证用户下单时候的核心事务执行成功就行,后续再进行商品库存扣减和用户积分增加修复, 最大限度地保证下单成功。
用于: 核心事务和非核心事务解耦(不能说用户积分服务异常就导致用户无法创单)

transaction_msg transaction_msg_vs

  • [cutoff] 日志格式
  • OMS(OpenMessaging)
  • 消息轨迹
new DefaultMQProducer("ProducerGroupName",true);
new DefaultMQPushConsumer("CID_JODIE_1",true);

// 指一条消息从生产者发送到消息队列RocketMQ版服务端,再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。
// 类似traceId排查问题用的
  • 优化消费速度
* 提高消费并行度
    * 加机器
    * 多线程 consumeThreadMin、consumeThreadMax
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic_name");
        consumer.setConsumeThreadMax(100); // 默认是20
        consumer.setConsumeThreadMax(50); // 默认是20
* 批量方式消费
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic_name");
    consumer.setConsumeMessageBatchMaxSize(10); // 默认是1
* 跳过非重要消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            long offset = messages.get(0).getQueueOffset();
            String maxOffset = messages.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
            long diff = Long.parseLong(maxOffset) - offset;
            if (diff > 100000) {
                // 消息堆积情况的特殊处理...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            // 正常消费过程...
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
* 优化每条消息消费过程
  • 轻消息队列
  • 消息幂等
  • 一个生产者对应多个topic,一个消费者对应多个topic
  • 消费限速:
    • Sentinel 为 Apache RocketMQ 保驾护航
    • 消费者参数控制消费速度 (但是只能控制单实例的):
      • 总的消费最大QPS = 实例数量 * setPullBatchSize * setConsumeThreadMax / setPullInterval * 1000
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_msg"); // consumerGroup:batch_msg
    consumer.setConsumeMessageBatchMaxSize(1); // 调用MessageListener处理的地方一次传入List<MessageExt>的数量
    consumer.setPullBatchSize(1); // 一次从Broker的一个Message Queue获取消息的数量(默认32个)
    consumer.setConsumeThreadMax(1); // consumer最大线程数
    consumer.setConsumeThreadMin(1); // consumer最小线程数
    consumer.setPullInterval(1000); // // 每次拉取的间隔,单位为毫秒

abstract biz framework

Feature:

  1. 告警监控
  2. producer/consumer 支持重试
  3. 多次重试失败支持存储到DB
  4. 支持切面对业务代码无侵入添加逻辑
  5. 自动分区sharding队列

TODO: 待完善

这里以RMQ为例,应当也无缝切换支持kafka。

ref