文章阅读请前先参考看一下 https://www.cnblogs.com/hudean/p/13858285.html 安装RabbitMQ消息队列软件与了解C#中如何使用RabbitMQ 和 https://www.cnblogs.com/Keep-Ambition/p/8038885.html 添加一个用户并可以远程访问,
消息队列的作用:跨服务通信、服务之间解耦,削峰、异步,其实还有一个作用是提高接收者性能
RabbitMQ 官方网站:https://www.rabbitmq.com/
RabbitMQ 中文文档网址:http://rabbitmq.mr-ping.com/
本文代码GitHub 地址是: https://github.com/hudean/MQDemo
一、初衷
- 开发难度大,各系统间分别隔离,需要关注消息中间件的各种复杂繁琐的配置,关注不同的消息则需要对接不同的消息队列
- 维护成本高,各系统或团队需要分别管理消息中间件、处理各种服务异常、(消息中间件的高可用、业务的高可用等)
- 管理难度大,没法对消息的生产和消费进行业务管理,也不方便对消息中的敏感数据进行权限管理
- 扩展成本高,无法统一消息系统扩展功能,如路由、延时、重试、消费确认等 总结消息队列是一个面向技术的接入,重点关注消息队列的配置、接口对接;而消息总线则是通过屏蔽部署、分组和通信等技术细节,实现一个面向业务的接入,重点关注要接收什么消息。
二、定义
事件总线是实现基于事件驱动模式的方式之一,事件发送者将事件消息发送到一个事件总线上,事件订阅者向事件总线订阅和接收事件,而后再处理接收到的事件。固然,订阅者不只能够接收和消费事件,它们自己也能够建立事件,并将它们发送到事件总线上。
事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,容许不一样的组件之间进行彼此通讯而又不须要相互依赖,达到一种解耦的目的。
如前所述,使用基于事件的通信时,当值得注意的事件发生时,微服务会发布事件,例如更新业务实体时。 其他微服务订阅这些事件。 微服务收到事件时,可以更新其自己的业务实体,这可能会导致发布更多事件。 这是最终一致性概念的本质。 通常通过使用事件总线实现来执行此发布/订阅系统。 事件总线可以设计为包含 API 的接口,该 API 是订阅和取消订阅事件和发布事件所需的。 它还可以包含一个或多个基于跨进程或消息通信的实现,例如支持异步通信和发布/订阅模型的消息队列或服务总线。
可以使用事件来实现跨多个服务的业务事务,这可提供这些服务间的最终一致性。 最终一致事务由一系列分布式操作组成。 在每个操作中,微服务会更新业务实体,并发布可触发下一个操作的事件。 下面的图 6-18 显示了通过事件总线发布了 PriceUpdated 事件,因此价格更新传播到购物篮和其他微服务。
图 6-18。 基于事件总线的事件驱动的通信
本部分介绍如何使用通用事件总线接口(如图 6-18 所示)实现这种与 .NET 的通信。 存在多种可能的实现,每种实现使用不同的技术或基础结构,例如 RabbitMQ、Azure 服务总线或任何其他第三方开源或商用服务总线。
三、集成事件
集成事件用于跨多个微服务或外部系统保持域状态同步。 此功能可通过在微服务外发布集成事件来实现。 将事件发布到多个接收方微服务(订阅到集成事件的尽可能多个微服务)时,每个接收方微服务中的相应事件处理程序会处理该事件。
集成事件基本上是数据保持类,如以下示例所示:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Text.Json.Serialization; 6 using System.Threading.Tasks; 7 8 namespace EventBus.Events 9 { 10 /// <summary> 11 /// 集成事件 12 /// </summary> 13 public record IntegrationEvent 14 { 15 public IntegrationEvent() 16 { 17 Id = Guid.NewGuid(); 18 CreationDate = DateTime.UtcNow; 19 } 20 [JsonConstructor] 21 public IntegrationEvent(Guid id, DateTime createDate) 22 { 23 Id = id; 24 CreationDate = createDate; 25 } 26 27 [JsonInclude] 28 public Guid Id { get; private init; } 29 30 [JsonInclude] 31 public DateTime CreationDate { get; private init; } 32 } 33 }
1 public class ProductPriceChangedIntegrationEvent : IntegrationEvent 2 { 3 public int ProductId { get; private set; } 4 public decimal NewPrice { get; private set; } 5 public decimal OldPrice { get; private set; } 6 7 public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, 8 decimal oldPrice) 9 { 10 ProductId = productId; 11 NewPrice = newPrice; 12 OldPrice = oldPrice; 13 } 14 }
事件总线
事件总线可实现发布/订阅式通信,无需组件之间相互显式识别,如图 6-19 所示。
图 6-19。 事件总线的发布/订阅基础知识
上图显示了微服务 A 发布到事件总线,这会分发到订阅微服务 B 和 C,发布服务器无需知道订阅服务器。 事件总线与观察者模式和发布-订阅模式相关。
观察者模式
在观察者模式中,主对象(称为可观察对象)将相关信息(事件)告知其他感兴趣的对象(称为观察者)。
发布-订阅(发布/订阅)模式
发布/订阅模式的用途与观察者模式相同:某些事件发生时,需要告知其他服务。 但观察者模式与发布/订阅模式之间存在重要区别。 在观察者模式中,直接从可观察对象广播到观察者,因此它们“知道”彼此。 但在发布/订阅模式中,存在称为中转站、消息中转站或事件总线的第三个组件,发布服务器和订阅服务器都知道第三个组件。 因此,使用发布/订阅模式时,发布服务器和订阅服务器通过所述的事件总线或消息中转站精确分离。
中转站或事件总线
如何实现发布服务器和订阅服务器之间的匿名? 一个简单方法是让中转站处理所有通信。 事件总线是一个这样的中转站。
事件总线通常由两部分组成:
-
抽象或接口。
-
一个或多个实现。
在图 6-19 中,从应用程序角度看,会发现事件总线实际上是一个发布/订阅通道。 实现此异步通信的方式可能会有差异。 它可以具有多个实现,以便你进行交换,具体取决于环境要求(例如,生产和开发环境)。
在图 6-20 中,可看到事件总线的抽象,包含基于 RabbitMQ、Azure 服务总线或其他事件/消息中转站等基础结构消息技术的多个实现。
图 6- 20。 事件总线的多个实现
最好通过接口定义事件总线,以便它可使用多种技术(例如 RabbitMQ、Azure 服务总线等)来实现。 但是,如前所述,仅当需要由你的抽象支持的基本事件总线功能时,才适合使用你自己的抽象(事件总线接口)。 如果需要更丰富的服务总线功能,应使用你喜欢的商用服务总线提供的 API 和抽象,而不是你自己的抽象。
定义事件总线接口
首先,让我们了解一下事件总线接口的一些实现代码和可能的实现。 接口应是通用和简单的,如下所示接口。
1 using EventBus.Events; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 事件总线接口 7 /// </summary> 8 public interface IEventBus 9 { 10 /// <summary> 11 /// 发布 12 /// </summary> 13 /// <param name="event"></param> 14 void Publish(IntegrationEvent @event); 15 16 /// <summary> 17 /// 订阅 18 /// </summary> 19 /// <typeparam name="T"></typeparam> 20 /// <typeparam name="TH"></typeparam> 21 void Subscribe<T, TH>() 22 where T : IntegrationEvent 23 where TH : IIntegrationEventHandler<T>; 24 25 /// <summary> 26 /// 动态订阅 27 /// </summary> 28 /// <typeparam name="TH"></typeparam> 29 /// <param name="eventName"></param> 30 void SubscribeDynamic<TH>(string eventName) 31 where TH : IDynamicIntegrationEventHandler; 32 33 /// <summary> 34 /// 取消动态订阅 35 /// </summary> 36 /// <typeparam name="TH"></typeparam> 37 /// <param name="eventName"></param> 38 void UnsubscribeDynamic<TH>(string eventName) 39 where TH : IDynamicIntegrationEventHandler; 40 41 /// <summary> 42 /// 取消订阅 43 /// </summary> 44 /// <typeparam name="T"></typeparam> 45 /// <typeparam name="TH"></typeparam> 46 void Unsubscribe<T, TH>() 47 where TH : IIntegrationEventHandler<T> 48 where T : IntegrationEvent; 49 } 50 }
借助 RabbitMQ 的事件总线实现,微服务可订阅事件、发布事件和接收事件,如图 6-21 所示。
图 6-21。 事件总线的 RabbitMQ 实现
RabbitMQ 充当消息发布服务器和订阅者之间的中介,处理分发。 在代码中,EventBusRabbitMQ 类实现了泛型 IEventBus 接口。 此实现基于依赖项注入,以便可以从此开发/测试版本交换到生产版本。
1 public class EventBusRabbitMQ : IEventBus, IDisposable 2 { 3 // Implementation using RabbitMQ API 4 //... 5 }
示例开发/测试事件总线的 RabbitMQ 实现是样板代码。 它必须处理与 RabbitMQ 服务器的连接,并提供用于将消息事件发布到队列的代码。 它还必须为每个事件类型实现收集集成事件处理程序的字典;这些事件类型可以对每个接收器微服务具有不同的实例化和不同的订阅,如图 6-21 所示。
四、使用 RabbitMQ 实现一个简单的发布方法
下面的代码是 RabbitMQ 的事件总线实现的简化版,用以展示整个方案。 你真的不必以这种方式处理连接。 要查看完整的实现,在后面
1 public class EventBusRabbitMQ : IEventBus, IDisposable 2 { 3 // Member objects and other methods ... 4 // ... 5 6 public void Publish(IntegrationEvent @event) 7 { 8 var eventName = @event.GetType().Name; 9 var factory = new ConnectionFactory() { HostName = _connectionString }; 10 using (var connection = factory.CreateConnection()) 11 using (var channel = connection.CreateModel()) 12 { 13 channel.ExchangeDeclare(exchange: _brokerName, 14 type: "direct"); 15 string message = JsonConvert.SerializeObject(@event); 16 var body = Encoding.UTF8.GetBytes(message); 17 channel.BasicPublish(exchange: _brokerName, 18 routingKey: eventName, 19 basicProperties: null, 20 body: body); 21 } 22 } 23 }
五、使用 RabbitMQ API 实现订阅代码
与发布代码一样,下面的代码是 RabbitMQ 事件总线实现的简化部分。
public class EventBusRabbitMQ : IEventBus, IDisposable { // Member objects and other methods ... // ... public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } _subsManager.AddSubscription<T, TH>(); } }
每个事件类型都有一个相关的通道,以获取 RabbitMQ 中的事件。 然后,可以根据需要在每个通道和事件类型中拥有尽可能多的事件处理程序。
订阅方法接受一个 IIntegrationEventHandler 对象,该对象相当于当前微服务中的回调方法,以及其相关的 IntegrationEvent 对象。 然后,代码将该事件处理程序添加到事件处理程序列表,每个客户端微服务的每个集成事件类型都可具有事件处理程序。 如果客户端代码尚未订阅事件,该代码将为事件类型创建一个通道,以便在从任何其他服务中发布事件时,它可以从 RabbitMQ 以推送方式接收事件。
六、使用 RabbitMQ 完整实现事件总线代码
结构图如下:
动态集成事件处理器接口
1 using System.Threading.Tasks; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 动态集成事件处理器接口 7 /// </summary> 8 public interface IDynamicIntegrationEventHandler 9 { 10 Task Handle(dynamic eventData); 11 } 12 }
事件总线接口
1 using EventBus.Events; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 事件总线接口 7 /// </summary> 8 public interface IEventBus 9 { 10 /// <summary> 11 /// 发布 12 /// </summary> 13 /// <param name="event"></param> 14 void Publish(IntegrationEvent @event); 15 16 /// <summary> 17 /// 订阅 18 /// </summary> 19 /// <typeparam name="T"></typeparam> 20 /// <typeparam name="TH"></typeparam> 21 void Subscribe<T, TH>() 22 where T : IntegrationEvent 23 where TH : IIntegrationEventHandler<T>; 24 25 /// <summary> 26 /// 动态订阅 27 /// </summary> 28 /// <typeparam name="TH"></typeparam> 29 /// <param name="eventName"></param> 30 void SubscribeDynamic<TH>(string eventName) 31 where TH : IDynamicIntegrationEventHandler; 32 33 /// <summary> 34 /// 取消动态订阅 35 /// </summary> 36 /// <typeparam name="TH"></typeparam> 37 /// <param name="eventName"></param> 38 void UnsubscribeDynamic<TH>(string eventName) 39 where TH : IDynamicIntegrationEventHandler; 40 41 /// <summary> 42 /// 取消订阅 43 /// </summary> 44 /// <typeparam name="T"></typeparam> 45 /// <typeparam name="TH"></typeparam> 46 void Unsubscribe<T, TH>() 47 where TH : IIntegrationEventHandler<T> 48 where T : IntegrationEvent; 49 } 50 }
集成事件处理器接口
1 using EventBus.Events; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace EventBus.Abstractions 9 { 10 /// <summary> 11 /// 集成事件处理器接口 12 /// </summary> 13 /// <typeparam name="TIntegrationEvent">TIntegrationEvent泛型</typeparam> 14 public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler 15 where TIntegrationEvent : IntegrationEvent 16 { 17 Task Handle(TIntegrationEvent @event); 18 } 19 20 /// <summary> 21 /// 集成事件处理器 22 /// </summary> 23 public interface IIntegrationEventHandler 24 { 25 } 26 }
集成事件
1 using System.Text; 2 using System.Text.Json.Serialization; 3 using System.Threading.Tasks; 4 5 namespace EventBus.Events 6 { 7 /// <summary> 8 /// 集成事件 9 /// </summary> 10 public record IntegrationEvent 11 { 12 public IntegrationEvent() 13 { 14 Id = Guid.NewGuid(); 15 CreationDate = DateTime.UtcNow; 16 } 17 [JsonConstructor] 18 public IntegrationEvent(Guid id, DateTime createDate) 19 { 20 Id = id; 21 CreationDate = createDate; 22 } 23 24 [JsonInclude] 25 public Guid Id { get; private init; } 26 27 [JsonInclude] 28 public DateTime CreationDate { get; private init; } 29 } 30 }
GenericTypeExtensions
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace EventBus.Extensions 8 { 9 public static class GenericTypeExtensions 10 { 11 public static string GetGenericTypeName(this Type type) 12 { 13 var typeName = string.Empty; 14 if (type.IsGenericType) 15 { 16 var genericTypes = string.Join(",", type.GetGenericArguments().Select(t => t.Name).ToArray()); 17 typeName = $"{type.Name.Remove(type.Name.IndexOf('`'))}<{genericTypes}>"; 18 } 19 else 20 { 21 typeName = type.Name; 22 } 23 24 return typeName; 25 } 26 27 /// <summary> 28 /// 获取通用类型名称 29 /// </summary> 30 /// <param name="object"></param> 31 /// <returns></returns> 32 public static string GetGenericTypeName(this object @object) 33 { 34 return @object.GetType().GetGenericTypeName(); 35 } 36 } 37 }
事件总线订阅管理器接口
1 using EventBus.Abstractions; 2 using EventBus.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 using static EventBus.InMemoryEventBusSubscriptionsManager; 9 10 namespace EventBus 11 { 12 /// <summary> 13 /// 事件总线订阅管理器接口 14 /// </summary> 15 public interface IEventBusSubscriptionsManager 16 { 17 bool IsEmpty { get; } 18 event EventHandler<string> OnEventRemoved; 19 20 /// <summary> 21 /// 添加动态订阅 22 /// </summary> 23 /// <typeparam name="TH"></typeparam> 24 /// <param name="eventName"></param> 25 void AddDynamicSubscription<TH>(string eventName) 26 where TH : IDynamicIntegrationEventHandler; 27 28 /// <summary> 29 /// 添加订阅 30 ///
全部评论
请发表评论