背景(在什么场景什么情况会出现),过程(复现过程),难点(解决这个问题的难点),选择(为什么这么解决),解决方向(先不提供代码),复盘。 按这个思路 写一下Rocketmq顺序消费
面试官你好,我是黄志慎,是广东工业大学研二的学生。
我现在主要开发方向是JAVA,同时也了解一些其他语言比如python、go。
我同时也有 mysql数据库,rocketmq,redis 这些中间件和docker的工作经验。
我之前在小鹏汇天自动驾驶大数据部门实习,我参与了“数据管理与监控系统”项目的开发,这个项目作为一个数据平台,对数据进行解析处理供自动驾驶其他部门使用,
涵盖数据展示平台(JAVA)和数据处理中心(Python)和数据备份及推送系统(Python),
数据展示平台提供数据解析、展示等业务需求,数据处理中心作为处理模块负责多种解析需求。
数据备份及推送系统负责数据检测、备份。
我主要做了其中一些需求,
1.使用mq对各个模块之间解耦。
2.业务需求开发,实现消息插队功能,解决消息重复消费问题
3.协助角色权限管理设计
4.容器化和CI/CD:配合jenkins 和docker实现自动部署。
编写了Dockerfile定义了模块容器镜像构建过程,使用Docker-compose编写部署指令,使其可以在不同环境中快速部署和运行。通过配置Jenkins实现了持续集成与持续部署(CI/CD)。
背景:
面向切面编程,在不修改源代码的情况下,对功能进行增强。比如日志记录、事务管理、性能统计和安全检查等。
过程:
切面,切点,通知
实现切面类,实现通知(before, after方法),标注切点。
选择:
做程序增强,跟业务解耦。
复盘:
性能影响
场景:
引擎,索引,优化,锁
索引
结构,分类,设计,性能分析,使用
结构:B+树,叶子双链表
分类:聚簇索引、二级索引
性能分析:频率、慢日志、explain解释
使用:左 范 实 联合索引 覆盖 前缀索引
锁
全局 表级 行级
行级 (通过索引项加锁) 共享锁、排他锁(for up)
角色生产消费类型
角色:生产者 消费者 NameServer broker(message queue) Topic(tag)
生产消息: 同步、异步、单向
消费消息:广播、集群
消息类型:顺序、延迟、
消费者可能多次接收同一条消息。
背景:
1.生产者没收到broker的应答
2.broker没收到消费者的应答
难点:
确保消费幂等性,要实现这点需要在应用层面做好设计。
选择:
将消息状态储存在数据库中,比如-1,0,1表示任务 创建,消费中,消费完成。然后我们只消费状态为-1的消息。这样保证消息幂等。
复盘:
对于数据量小没问题,数据量大的话对数据库压力很大。所以应对高并发场景的话,可以将插入数据库更新操作,替换成Redis(Map<唯一ID,状态>)。
场景:
在某些极端情况下,比如Broker宕机等,未持久化的消息可能会丢失。
保证严格顺序消费在高并发时具有挑战性。
背景:
在诸如支付、交易处理等业务场景中,需要严格保证消息的处理顺序来维持业务的正确性。
难点:
生产者插入到messageQueue不能乱,消费者消费同一个messageQueue顺序不能乱。
选择:
生产者对顺序消息通过唯一ID路由到同一队列。而且此时要对生产消息的要求是 同步消息。单队列采用单个消费线程模型避免并发问题。
解决方向:
需要合理配置每个Queue对应单个Consumer Group来确保均匀分摊负载。
RocketMQ支持的延迟级别是固定的,并不能精确地控制到特定秒数。
批量发送效率问题:大批量发送可能会引起性能瓶颈或网络拥堵。
事务消息:虽然提供了事务性消息,但其机制相对复杂需要用户谨慎处理以避免问题。
分布式事务的一致性问题:在进行跨服务分布式事务时保证最终一致性较为复杂。
消息堆积:生产者产生速度与消费者处理速度不匹配导致消费延迟。
死信队列管理: 死信处理需要人工干预,管理较为繁琐
背景:
多线程度独立对象
难点:
管理ThreadLocal变量的生命周期:不正确地管理ThreadLocal可以导致内存泄漏。
场景:
工作流程:
远程获取 clone/fetch/pull,
切换分支 branch/checkout,
保存分支:add/commit,
合并分支:fetch,merge
推送分支:push
背景:
需要对接口做耗时统计
选择:
然后他也不属于业务,算是对接口做一个方法增强嘛,
这时候Spring AOP做切面,实现切面类,实现通知(before, after方法),标注切点。
ThreadLocal记录线程入口时间,
before就开始设置当前时间,after就开始计算时间。
计算完移除ThreadLocal。
知识点:ThreadLocal、AOP、
数据展示平台
数据解析需求,转录视频/裁减,解析...
数据备份及推送系统
扫描线程,负责新数据检测,有新数据则待生成处理任务插入检测表。
执行线程,扫描检测表,生成解析任务插入解析任务表,备份数据,发送解析消息,上传minio
数据处理中心
循环线程,拉取mq消息,通过反射,执行指定的解析任务并防止消息重复。
背景:
除了自动检测的数据解析任务,有时候也有用户临时需要的数据解析任务。
难点:
RocketMQ无法支持消息优先级调度。
选择:
通过两个topic来模拟优先级队列,就分普通队列和优先级队列。用户在数据展示平台点击插队功能,就在优先级队列中加入消息,如果解析任务表没有该任务就插入解析任务表,因为普通队列可能已经有这个任务了。
然后数据处理中心会从优先级队列先拉取,如果没有再从普通队列去拉取,然后通过反射,执行指定的解析任务,然后通过查询解析任务表任务状态来实现幂等机制,解决消息重复消费问题。
知识点:消息重复
背景:
在旧系统中,数据展示平台(Java)直接以请求方式调用数据处理中心(Python),导致模块间耦合度高,这影响了系统的维护性和扩展性。
过程:
既然是偶合度高,就想到让模块做解耦,MQ就有削峰、异步、解耦功能。所以就是通过MQ来实现异步处理数据,实现模块解耦。
选择:
消息队列选择: 其实MQ主流就那三个,RocketMQ,RabbitMQ,kafka,我们就选择了RocketMQ,还是基于团队成员技术栈熟悉哪个技术。
MQ搭建: 容器化快速搭建本地、测试环境MQ。团队现在本地和测试都是往容器化方向进行快速测试验证。生产的化还是使用运维团队环境。
定义消息队列格式:任务ID,方法,参数
生产者: 配置nameServer地址,生产者Group 把消息封装到hashMap, 然后发送到指定Topic,
MQ面板配置Topic
消费者:配置消费者Group
消费者:
原来是数据展示平台通过请求方式去调用数据处理中心,现在用RocketMQ的话就是数据展示平台发送MQ消息到broker,RocketMQ生产消息类型有(同步、异步、单向),这边就使用异步保证消息发送成功,然后还要指定消息格式(任务id,函数,参数...)。
用户-角色-权限,用户角色表,角色权限表。
1.部分容器化+Jenkins。构建一个可运行容器环境,代码和数据通过挂载方式,容器构建之后比较稳定。
2.完全容器化+Jenkins,把代码和挂载对象都放容器里,Jenkins每次都要构建新的容器,推送到容器平台。
3.完全容器化+Jenkins+k8s。
1.配置流水线,仓库、勾子、脚本JenkinsFile
2.dockerFile: 基础镜像,RUN,COPY,ENV 配环境变量,配镜像源,安装各种包,测试阶段每个阶段打基础镜像
docker-compose:容器卷,端口,初始化脚本