XIoT.EventBus是一个基于新生命X组件的分布式事件总线,可以跨应用触发事件。基于发布/订阅模式,消息的传递可以通过activemq, redis,rabbitmq,kafka, rocketmq等实现,支持控制台和web应用。
目前,已经实现了activemq、rabbitmqg。
本组件可能简化对消息发布和订阅的操作处理工作,让开发人员将精力集中在业务逻辑的处理上,发布消息时只需要通过IEventBus.Publish(topic, EventMessage),订阅消息时使用IEventBus.Subscribe(topic, EventHandler)即可。其中,EventHandler为订阅端接收到消息后的处理器。
/// <summary>
/// 发布消息
/// </summary>
/// <param name="topic">消息主题</param>
/// <param name="message">消息体,注意所有发布的消息均须为EventMessage的子类,方便事件总线底层进行通讯处理。</param>
/// <param name="priority">消息优先级</param>
void Publish(String topic, EventMessage message, MQPriority priority = MQPriority.Normal);
/// <summary>
/// 异步发布消息
/// </summary>
/// <param name="topic">消息主题</param>
/// <param name="message">消息体</param>
/// <param name="priority">消息优先级</param>
/// <returns></returns>
Task PublishAsync(String topic, EventMessage message, MQPriority priority = MQPriority.Normal);
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="topic">消息主题</param>
/// <param name="handler">消息事件处理器,用于在接收到指定主题的消息时对该消息进行相关业务处理工作,<seealso cref="EventHandler"/></param>
void Subscribe(String topic, EventHandler handler);
/// <summary>
/// 异步订阅消息
/// </summary>
/// <param name="topic">消息主题</param>
/// <param name="handler">消息事件处理器</param>
/// <returns></returns>
Task SubscribeAsync(String topic, EventHandler handler);
/// <summary>
/// 取消消息订阅
/// </summary>
/// <param name="topics">主题列表</param>
void Unsubscribe(IEnumerable<String> topics);
/// <summary>
/// 异步取消消息订阅
/// </summary>
/// <param name="topics">消息列表</param>
/// <returns></returns>
Task UnsubscribeAsync(IEnumerable<String> topics);
/// <summary>
/// 取消该事件总线上所有的消息订阅
/// </summary>
void UnsubscribeAll();
/// <summary>
/// 异步取消该事件总线上所有的消息订阅
/// </summary>
/// <returns></returns>
Task UnsubscribeAllAsync();
发布消息时,系统只需要创建一个指定的消息服务事件处理总线,然后通过上面提供的消息发布接口即可实现消息发布,在本实现中无论采用哪种消息服务器,都默认实现了消息的持久化,使得在系统出现异常的时候不丢失消息。
IEventBus evtbus = new ActiveMQEventBus();
var topic = "XIoT/EventBus/Test";
evtbus.Publish(topic, new EventMessage() {
Action = "Test",
Body = "学无先后达者为师"
});
其中,发布和订阅的消息体负载Payload采用String属性,方便通过Json序列化方式对业务需要传输的实体进行封装,也是利用这个特性实现不同消息的发布和订阅机制。Action为业务定义的动作属性。
订阅消息前首先要定义好用于处理消息的事件处理器(即业务端的处理代码),继承EventHandler类,实现Handle(EventMessage evtArgs)方法,在订阅的时候底层框架会自动处理消息监听和事件分发工作。
public class MyEventBusHandler : EventHandler
{
public override void Handle(EventMessage evtArgs)
{
Console.WriteLine($"{evtArgs.EventTime} - {(String)evtArgs.Body} 接收到消息, 动作名称:{evtArgs.Action}");
}
}
IEventBus evtbus = new ActiveMQEventBus();
var topics = new List<String>() { "XIoT/EventBus/Test" };
var handler = new MyEventBusHandler();
evtbus.Subscribe(topics, handler);
1.X组件
2.ActiveMQ
3.RabbitMQ
4.EasyNETQ