定时任务扫描到奖励表,发送到mq,用户服务订阅这个服务增加积分
订单-受益人-奖励-状态
- 阶段奖励表状态更新为正在发放,
- 向mq发送事务消息header={order_id tx_id} body={奖励记录}
- mq收到消息会有回调方法executeLocalTransaction(),在里面 update 状态为已发送,然后本地事务表插入一条记录(tx_id desc 方便回查),给mq发消息说commit/rollback
- 如果未收到 刚刚的commit/rollback,回调方法checkLocalTransaction,就主动根据tx_id查本地事务表,如果有这条表示事务成功,没有就表示事务失败
用户积分服务订阅这个MQ,收到消息拿出来,找到对应的用户增加积分,和收支记录
为了防止重复消费,需要根据order_id来判断幂等性,如果收支记录有这笔订单的积分那么直接返回
消息表和奖励表在一个库,用户表在另外一个库
- 创建消息,存入mysql的消息表(待确认) (这一步代表预发送消息)
- 更新奖励表的发奖励的状态为已发
- 更新此消息(发送中), 放入消息队列 (这一步就是正在的发送消息保证投递的可靠性)
第二阶段消费消息
拿到消息,根据消息转化成消息对象,
根据消息对象来幂等性判断,新增积分的增加记录,并给用户加上去,然后消息表删除这条消息
如果1 2出现问题,那么直接回滚,等重新做这一步, 如果3 mq出问题了,消息为发送中,但是mq挂了,所以需要通过定时器扫描发送中的消息,重发。 只有用户积分加上去了,删除掉消息表这条消息,才能保证不继续重发这条消息,可靠性得以保证
上面任何一个环节都有可能出问题,所以引入定时器去定时查询,防止数据库和消息中间件的不一致
基于模板方法模式
模板方法checkMessages()
1.分页查询发送中和待确认的消息记录,并放入集合 protected abstract PageResult getPageResult(PageQuery pageQuery);
key=messageId 根据序列自动生成工具生成的
value=message对象{messageId,messageBody,messageDataType,consumerQueue,messageSendTimes,alreadyDead(死?活),status(发送重待确认)}
2.处理消息processMessage(Map<String, ReliableMessage> messages);
重点说一下处理消息
- 判断发送次数
- 如果超过最大发送次数直接退出,标记为死亡
- 重发消息
- 根据这个消息body包含的订单信息,查询该订单表中的状态,如果订单表成功那么修改状态为发送中并重发消息。
- 如果订单失败,那么直接删除这一条消息记录