_ _ _____ ___ ___ _____
| | | | / ___/ / |/ | / _ \
| |_| | | |___ / /| /| | | | | |
| _ | \___ \ / / |__/ | | | | | |
| | | | ___| | / / | | | |_| |_
|_| |_| /_____/ /_/ |_| \_______|
- 基于Netty来实现的消息中间件
- 使用Server作为服务端
- 生产者发送消息到服务端后将消息投递到多个MessageQueue中,消费者订阅一个或者多个Queue
- 消费者每次取拉取10条消息后放入队列,异步消费
- 消费后会提交消费记录到本地,定时任务会定时的同步到服务端
- 在重启后,消费记录也会同步到消费者端
- 集群模式
- 负载均衡
项目位置:https://github.com/MortyCode/HsMq.git
字段 |
Length |
HeadLength |
head |
DataLength |
data |
解释 |
总消息体长度 |
消息头长度 |
消息体 |
|
|
长度 |
int(4个字节) |
int(4个字节) |
HeadLength的值 |
int(4个字节) |
DataLength的值 |
- 解码 LengthObjectDecode
- 编码 LengthObjectEncode
private static final Map<OperationEnum, BaseExecutor<?>> enumExecutorMap ;
....策略初始化操作
public static HsEecodeData executor(HsDecodeData hsDecodeData){
HsReq<?> hsReq = (HsReq<?>) hsDecodeData.getData();
OperationEnum operationEnum = hsReq.getOperationEnum();
if (operationEnum==null){
... 异常处理
}
BaseExecutor<?> baseExecutor = enumExecutorMap.get(operationEnum);
return baseExecutor.executorReq(hsReq);
}
- 这里有一个细节,这里有两个方法,executor 方法的泛型是 T 标识一个确定的类型,executor0 方法泛型的是 ? 标识一个不确定的类型 .
- 因为从HsDecodeData获取的对象是Object类型,并不知道此时类型是什么,所以convertReq的意义就是校验请求对象的数据是不是处理器需要的数据类型,并且转换。
public abstract class BaseExecutor<T> {
public static MessageStore messageStore = new MessageStore();
public abstract HsResp<?> executor(HsReq<T> hsReq);
public abstract HsReq<T> convertReq(HsReq<?> hsReq);
public HsEecodeData executorReq(HsReq<?> hsReq){
HsReq<T> fixedHsReq = convertReq(hsReq);
if (fixedHsReq==null){
return HsEecodeData.typeError();
}
HsResp<?> hsResp = executor(fixedHsReq);
hsResp.setReqType(hsReq.getOperation());
hsResp.setReqId(hsReq.getReqId());
return HsEecodeData.resp(hsResp);
}
}
- SendMessageExecutor 接受发送消息处理
- PullExecutor 拉去消息处理
- CommitOffsetExecutor 提交偏移量处理
- TopicDataExecutor 获取Topic的信息处理
2.2. 消息存储 SendMessageExecutor
- 消息体首先存储到一个公共存储消息位置,目前是在 mq_1文件之中
- 然后将返回的消息偏移量等信息存储到消息队列中,队列数为初始化的时候指定, 位置为/queue/目录下
- 然后消息消费的时候,会从queue中拉取消息
public HsResp<?> saveMessage(SendMessage sendMessage){
....
MessageDurability messageDurability = MessageStorage.saveMessage(sendMessage);
boolean push = ConsumerQueueManger.pushConsumerQueue(sendMessage, messageDurability);
.....
return resp;
}
- |
Length |
Data |
Length |
Data |
...... |
解释 |
下一条数据长度 |
消息内容 |
下一条数据长度 |
消息内容 |
...... |
占用 |
int(4个字节) |
Length长度 |
int(4个字节) |
Length长度 |
...... |
public class SendMessage implements Serializable {
private static final long serialVersionUID = -20210610L;
private String msgId;
private String topic;
private String tag;
private String key;
private String body;
}
- 使用RandomAccessFile来进行存储,返回消息位点,以及偏移量
- RandomAccessFile可以在任意位置进行快速的读写
public static synchronized MessageDurability save(String fileName,Object object)
throws IOException, InterruptedException{
RandomAccessFile rws = new RandomAccessFile(fileName, "rw");
FileChannel fileChannel = rws.getChannel();
byte[] bytes = ObjectByteUtils.toByteArray(object);
if (bytes==null){
throw new FileException("文件转化异常:object not can cast bytes");
}
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length+4);
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
MessageDurability messageDurability = new MessageDurability();
messageDurability.setLength(byteBuffer.limit());
messageDurability.setOffset(fileChannel.size());
fileChannel.position(fileChannel.size());
fileChannel.write(byteBuffer);
fileChannel.force(true);
fileChannel.close();
return messageDurability;
}
- |
Offset |
Length |
TagHashcode |
Index |
...... |
解释 |
偏移量 |
消息长度 |
消息tagHashCode |
这个队列里面的第几个消息 |
...... |
占用 |
long(8个字节) |
int(4个字节) |
int(4个字节) |
long(8个字节) |
...... |
- 消息在存储到 mq_n 之后,会将消息分配到 消息队列之后,然后消费者在拉取消息的时候,会指定queueId来进行拉取数据。
- 拉取消息的话,会首先读取queue的信息,读取出指定偏移量的n条数据的信息,然后去 mq_n 去查询
public List<PullMessage> pullMessage(Pull pull){
//读取n条数据
List<MessageDurability> data =
MessageDurabilityStorage.readMessageQueue(pull.getQueueId(),
pull.getTopic(),
pull.getOffset());
if (data.size()==0){
return null;
}
//然后在去根据元信息读取消息
return MessageStorage.readMessages(data);
}
2.4 消息消费位点保存 CommitOffsetExecutor
- 消费位点存储使用JSON格式存储,格式为offSetMap下面的key代表消费者,Value代表每个队列的消费点
{
"offSetMap": {
"CConsumer": {
"0": 358,
"1": 363,
"2": 363,
"3": 359
},
"AConsumer": {
"0": 331,
"1": 336,
"2": 335,
"3": 332
},
"RConsumer": {
"0": 486,
"1": 492,
"2": 492,
"3": 487
},
"DConsumer": {
"0": 358,
"1": 363,
"2": 363,
"3": 359
},
"BConsumer": {
"0": 350,
"1": 355,
"2": 355,
"3": 351
}
}
}
2.4 Topic信息返回 TopicDataExecutor
messageQueueData.setTopic(data.getTopic());
messageQueueData.setConsumerKey(data.getConsumerKey());
messageQueueData.setQueueSize(topicListener.getQueueSize());
messageQueueData.setOffSetMap(QueueOffsetStorage.getOffSetMap(data.getTopic(),data.getConsumerGroup()));
- 消息发送目前很简单,组装数据后直接调用netty的writeAndFlush,添加了一个 Listener 以及一个超时的判断。
- 目前实现比较简单,因为这里其实并不是MQ系统的重点
public static SendMessageResult sendMsg(SendMessage sendMessage){
SendMessageResult sendMessageResult = new SendMessageResult();
try {
HsEecodeData hsEecodeData = new HsEecodeData();
....
resultMap.put(hsReq.getReqId(), sendMessageResult);
MessageClient.channelFuture
.channel()
.writeAndFlush(hsEecodeData)
.addListener((future)->{
if (future.isSuccess()) {
sendMessageResult.setSendDone(true);
} else {
sendMessageResult.setSendDone(false);
sendMessageResult.setRespDesc("消息发送失败");
}
});
long nanosTimeout = TimeUnit.SECONDS.toNanos(3);
final long deadline = System.nanoTime() + nanosTimeout;
while (true) {
if (nanosTimeout<0){
sendMessageResult.setRespDesc("发送超时");
sendMessageResult.setMessageResult(-1);
break;
}
if (sendMessageResult.getMessageResult() != null) {
break;
}
nanosTimeout = deadline - System.nanoTime();
}
return sendMessageResult;
} catch (Exception e) {
sendMessageResult.setMessageResult(-2);
sendMessageResult.setSendDone(false);
e.printStackTrace();
}
return sendMessageResult;
}
Map<String, Map<String,AbstractConsumer>>
{
"consumerGroup": {
"Topic": Consumer,
"Topic": Consumer,
},
"consumerGroup": {
"Topic": Consumer,
"Topic": Consumer,
},
}
系统内有多个消费组,一个消费组内有多个Topic对应的消费者
- 消息管理器 ConsumerMessageQueue
- 注册拉取消息任务
- 注册执行器任务,异步的从消息管理器中拉取数据进行消费
- 注册定时任务,定时向服务端提交消费偏移量
public static void registeredConsumer(String topic,String consumerGroup){
ThreadPoolExecutor executor = ExecutorService.getExecutor();
//创建消费者
ConsumerMessageQueue consumerMessageQueue = new ConsumerMessageQueue(topic,consumerGroup);
//注册到管理器中
consumerMessageQueueMap.put(consumerKey(topic,consumerGroup),consumerMessageQueue);
//注册拉取消息任务
executor.execute(new PullMessageTask(channelFuture , consumerGroup, consumerMessageQueue));
//注册执行器任务
executor.execute(new ExecutorMessageTask(channelFuture ,consumerMessageQueue));
//定时任务
channelFuture.channel().eventLoop().scheduleWithFixedDelay(()->{
//定时任务,定时向服务端提交消费偏移量
new Thread(new CommitOffsetTask(channelFuture,consumerMessageQueue)).start();
},1, 3L, TimeUnit.SECONDS);
}
- 首先向服务发出获取对应topic对应消费组的 TopicData 的请求
public static void initConsumerQueue(String consumerGroup){
consumerMessageQueueMap.forEach((consumerKey,queue)->{
HsEecodeData hsEecodeData = new HsEecodeData();
.....
hsReq.setOperation(OperationEnum.TopicData.getOperation());
hsEecodeData.setData(hsReq);
channelFuture.channel().writeAndFlush(hsEecodeData).sync();
});
}
- 然后根据服务端的返回同步消费数据
- 目前是为所有的队列都创建消息队列,然后把获取的queueId , offset 对应的信息初始化到 消费者里面,这样消费者就可以根据偏移量去服务端拉取数据
public void initQueue(MessageQueueData messageQueueData){
Integer queueSize = messageQueueData.getQueueSize();
for (int i=0;i<queueSize;i++){
queueMap.put(i,new ConcurrentLinkedQueue<>());
}
Map<Integer, Long> serverOffSetMap = messageQueueData.getOffSetMap();
if (serverOffSetMap!=null&&serverOffSetMap.size()>0){
offSetMap.putAll(serverOffSetMap);
lastMessageMap.putAll(serverOffSetMap);
}
}