找回密码
 立即注册
首页 业界区 安全 使用 .NET 实现 EventBus(事件总线)设计模式 ...

使用 .NET 实现 EventBus(事件总线)设计模式

胆饬 6 天前
EventBus(事件总线)是一种设计模式,它允许应用程序的不同部分之间进行松散耦合的通信。发布者(Publisher)将事件发布到总线,而订阅者(Subscriber)监听总线上的特定事件类型,并在事件发生时执行相应的操作。发布者和订阅者之间不需要直接引用。
核心概念:

  • 事件 (Event): 表示系统中发生的事情,通常是一个简单的 POCO (Plain Old CLR Object) 类。
  • 发布者 (Publisher): 创建并发送事件到 EventBus 的组件。
  • 订阅者 (Subscriber): 注册对特定类型事件感兴趣,并在事件发布时接收通知的组件。
  • 事件总线 (EventBus): 负责管理订阅和将事件路由到相应订阅者的中心枢纽。
设计思路:

  • 接口 (IEventBus): 定义 EventBus 的核心功能:订阅、取消订阅、发布。
  • 实现 (EventBus):

    • 使用一个数据结构(如 Dictionary)来存储事件类型与对应的订阅者列表。
    • 键 (Key) 是事件的 Type。
    • 值 (Value) 是一个委托列表(如 List> 或 List,后者需要进行类型转换)。
    • 需要考虑线程安全,因为订阅、取消订阅和发布可能在不同线程上发生。使用 lock 或 ConcurrentDictionary 等机制。

  • 泛型方法: 使用泛型使订阅和发布类型安全。
1. 定义事件 (Event)
  1. // 事件可以是任何类,通常包含事件相关的数据
  2. public class OrderCreatedEvent
  3. {
  4.     public int OrderId { get; }
  5.     public DateTime Timestamp { get; }
  6.     public OrderCreatedEvent(int orderId)
  7.     {
  8.         OrderId = orderId;
  9.         Timestamp = DateTime.UtcNow;
  10.     }
  11.     public override string ToString()
  12.     {
  13.         return $"订单已创建: ID={OrderId}, 时间={Timestamp}";
  14.     }
  15. }
  16. public class UserLoggedInEvent
  17. {
  18.     public string Username { get; }
  19.     public DateTime LoginTime { get; }
  20.     public UserLoggedInEvent(string username)
  21.     {
  22.         Username = username;
  23.         LoginTime = DateTime.UtcNow;
  24.     }
  25.     public override string ToString()
  26.     {
  27.         return $"用户已登录: 用户名={Username}, 时间={LoginTime}";
  28.     }
  29. }
复制代码
2. 定义 EventBus 接口 (IEventBus)
  1. using System;
  2. public interface IEventBus
  3. {
  4.     /// <summary>
  5.     /// 订阅指定类型的事件
  6.     /// </summary>
  7.     /// <typeparam name="TEvent">事件类型</typeparam>
  8.     /// <param name="handler">事件处理委托</param>
  9.     void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
  10.     /// <summary>
  11.     /// 取消订阅指定类型的事件
  12.     /// </summary>
  13.     /// <typeparam name="TEvent">事件类型</typeparam>
  14.     /// <param name="handler">之前订阅时使用的同一个事件处理委托实例</param>
  15.     void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
  16.     /// <summary>
  17.     /// 发布一个事件,所有订阅了该事件类型的处理程序都将被调用
  18.     /// </summary>
  19.     /// <typeparam name="TEvent">事件类型</typeparam>
  20.     /// <param name="event">要发布的事件实例</param>
  21.     void Publish<TEvent>(TEvent @event) where TEvent : class;
  22. }
复制代码
3. 实现 EventBus (EventBus)
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading; // 用于演示或简单场景,实际可能用 Task
  5. public class EventBus : IEventBus
  6. {
  7.     // 存储订阅信息的字典
  8.     // Key: 事件类型 (Type)
  9.     // Value: 处理该事件类型的委托列表 (List of Action<TEvent>),这里用 List<object> 存储以便处理不同泛型类型
  10.     private readonly Dictionary<Type, List<object>> _subscribers;
  11.     // 用于确保线程安全的锁对象
  12.     private readonly object _lock = new object();
  13.     public EventBus()
  14.     {
  15.         _subscribers = new Dictionary<Type, List<object>>();
  16.     }
  17.     public void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class
  18.     {
  19.         // 获取事件类型
  20.         Type eventType = typeof(TEvent);
  21.         lock (_lock)
  22.         {
  23.             // 如果该事件类型还没有订阅者,则创建新列表
  24.             if (!_subscribers.ContainsKey(eventType))
  25.             {
  26.                 _subscribers[eventType] = new List<object>();
  27.             }
  28.             // 添加处理委托到列表中 (确保不重复添加同一个委托实例)
  29.             if (!_subscribers[eventType].Contains(handler))
  30.             {
  31.                  _subscribers[eventType].Add(handler);
  32.             }
  33.         }
  34.     }
  35.     public void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class
  36.     {
  37.         Type eventType = typeof(TEvent);
  38.         lock (_lock)
  39.         {
  40.             // 检查是否存在该事件类型的订阅列表,以及列表中是否包含该处理委托
  41.             if (_subscribers.TryGetValue(eventType, out var handlers))
  42.             {
  43.                 handlers.Remove(handler);
  44.                 // 如果移除后列表为空,可以选择从字典中移除该事件类型(可选,优化内存)
  45.                 if (handlers.Count == 0)
  46.                 {
  47.                     _subscribers.Remove(eventType);
  48.                 }
  49.             }
  50.         }
  51.     }
  52.     public void Publish<TEvent>(TEvent @event) where TEvent : class
  53.     {
  54.         Type eventType = typeof(TEvent);
  55.         List<object> handlersToInvoke = null;
  56.         lock (_lock)
  57.         {
  58.             // 检查是否有该事件类型的订阅者
  59.             if (_subscribers.TryGetValue(eventType, out var handlers))
  60.             {
  61.                 // 复制列表以避免在迭代时修改集合(如果在处理程序中取消订阅)
  62.                 // 并且在锁外执行委托调用,减少锁的持有时间
  63.                 handlersToInvoke = handlers.ToList();
  64.             }
  65.         }
  66.         // 在锁外部调用处理程序,避免长时间持有锁
  67.         if (handlersToInvoke != null)
  68.         {
  69.             foreach (var handlerObj in handlersToInvoke)
  70.             {
  71.                 // 将 object 转换回具体的 Action<TEvent>
  72.                 if (handlerObj is Action<TEvent> handler)
  73.                 {
  74.                     try
  75.                     {
  76.                         // 调用处理委托
  77.                         // 实际应用中可能需要考虑:
  78.                         // 1. 异步执行 (handler( @event).ConfigureAwait(false)等)
  79.                         // 2. 错误处理 (try-catch 并记录日志)
  80.                         handler(@event);
  81.                     }
  82.                     catch (Exception ex)
  83.                     {
  84.                         // 处理或记录订阅者处理事件时发生的异常
  85.                         Console.WriteLine($"处理事件 {@event.GetType().Name} 时出错: {ex.Message}");
  86.                         // 根据策略决定是否继续通知其他订阅者
  87.                     }
  88.                 }
  89.             }
  90.         }
  91.     }
  92. }
复制代码
4. 示例:发布者和订阅者
  1. // === 发布者 ===
  2. public class OrderService
  3. {
  4.     private readonly IEventBus _eventBus;
  5.     public OrderService(IEventBus eventBus)
  6.     {
  7.         _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
  8.     }
  9.     public void CreateOrder(int orderId)
  10.     {
  11.         Console.WriteLine($"订单服务: 正在创建订单 {orderId}...");
  12.         // ... 执行创建订单的逻辑 ...
  13.         Console.WriteLine($"订单服务: 订单 {orderId} 创建成功.");
  14.         // 创建事件实例
  15.         var orderCreatedEvent = new OrderCreatedEvent(orderId);
  16.         // 发布事件到 EventBus
  17.         Console.WriteLine($"订单服务: 发布 {nameof(OrderCreatedEvent)} 事件.");
  18.         _eventBus.Publish(orderCreatedEvent);
  19.     }
  20. }
  21. public class AuthenticationService
  22. {
  23.      private readonly IEventBus _eventBus;
  24.     public AuthenticationService(IEventBus eventBus)
  25.     {
  26.         _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
  27.     }
  28.      public void Login(string username)
  29.     {
  30.         Console.WriteLine($"认证服务: 用户 {username} 尝试登录...");
  31.         // ... 认证逻辑 ...
  32.         Console.WriteLine($"认证服务: 用户 {username} 登录成功.");
  33.         var loggedInEvent = new UserLoggedInEvent(username);
  34.         Console.WriteLine($"认证服务: 发布 {nameof(UserLoggedInEvent)} 事件.");
  35.         _eventBus.Publish(loggedInEvent);
  36.     }
  37. }
  38. // === 订阅者 ===
  39. public class EmailService : IDisposable // 实现 IDisposable 以便在服务生命周期结束时取消订阅
  40. {
  41.     private readonly IEventBus _eventBus;
  42.     // 保存委托实例,以便取消订阅时使用
  43.     private readonly Action<OrderCreatedEvent> _orderCreatedHandler;
  44.     public EmailService(IEventBus eventBus)
  45.     {
  46.         _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
  47.         // 将处理方法赋值给字段
  48.         _orderCreatedHandler = HandleOrderCreated;
  49.         // 订阅事件
  50.         _eventBus.Subscribe(_orderCreatedHandler); // 使用保存的委托实例
  51.         Console.WriteLine("邮件服务: 已订阅 OrderCreatedEvent.");
  52.     }
  53.     private void HandleOrderCreated(OrderCreatedEvent @event)
  54.     {
  55.         Console.WriteLine($"邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: {@event.OrderId})...");
  56.         // 模拟发送邮件
  57.         Thread.Sleep(50); // 模拟耗时
  58.         Console.WriteLine($"邮件服务: 订单 {@event.OrderId} 的创建通知邮件已发送.");
  59.     }
  60.     public void Dispose()
  61.     {
  62.         // 在对象销毁时取消订阅,防止内存泄漏
  63.         _eventBus.Unsubscribe(_orderCreatedHandler); // 使用保存的委托实例取消订阅
  64.         Console.WriteLine("邮件服务: 已取消订阅 OrderCreatedEvent.");
  65.     }
  66. }
  67. public class InventoryService
  68. {
  69.     private readonly IEventBus _eventBus;
  70.     public InventoryService(IEventBus eventBus)
  71.     {
  72.         _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
  73.         // 可以直接使用 Lambda 表达式或方法组,但取消订阅会稍微麻烦些
  74.         // 如果需要取消订阅,最好像 EmailService 那样保存委托实例
  75.         _eventBus.Subscribe<OrderCreatedEvent>(HandleOrderCreated);
  76.          Console.WriteLine("库存服务: 已订阅 OrderCreatedEvent.");
  77.     }
  78.     private void HandleOrderCreated(OrderCreatedEvent @event)
  79.     {
  80.         Console.WriteLine($"库存服务: 收到订单创建事件. 正在为订单 {@event.OrderId} 预留库存...");
  81.         // 模拟库存操作
  82.         Thread.Sleep(100); // 模拟耗时
  83.         Console.WriteLine($"库存服务: 订单 {@event.OrderId} 的库存已预留.");
  84.     }
  85.      // 注意:如果用 Lambda 或方法组直接订阅且需要取消订阅,需要保存那个委托实例
  86.      // public void StopListening() { _eventBus.Unsubscribe<OrderCreatedEvent>(HandleOrderCreated); } // 这样可能无法正确取消匿名方法
  87. }
  88. public class AuditService
  89. {
  90.      private readonly IEventBus _eventBus;
  91.     public AuditService(IEventBus eventBus)
  92.     {
  93.         _eventBus = eventBus;
  94.         // 订阅多种事件
  95.         _eventBus.Subscribe<OrderCreatedEvent>(LogEvent);
  96.         _eventBus.Subscribe<UserLoggedInEvent>(LogEvent);
  97.          Console.WriteLine("审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.");
  98.     }
  99.     // 一个通用的日志记录方法,处理不同类型的事件
  100.     private void LogEvent<TEvent>(TEvent @event) where TEvent : class
  101.     {
  102.          Console.WriteLine($"审计服务: 记录到事件 - 类型: {@event.GetType().Name}, 内容: {@event.ToString()}");
  103.     }
  104.     // 可选:如果需要取消订阅,同样需要保存委托实例
  105. }
复制代码
5. 运行示例
  1. using System;
  2. public class Program
  3. {
  4.     public static void Main(string[] args)
  5.     {
  6.         // 1. 创建 EventBus 实例 (通常通过依赖注入容器管理,这里手动创建)
  7.         IEventBus eventBus = new EventBus();
  8.         // 2. 创建订阅者实例,并将 EventBus 注入
  9.         using (var emailService = new EmailService(eventBus)) // 使用 using 确保 Dispose 被调用以取消订阅
  10.         {
  11.             var inventoryService = new InventoryService(eventBus);
  12.             var auditService = new AuditService(eventBus);
  13.             Console.WriteLine("\n--- 初始化完成, 准备发布事件 ---\n");
  14.             // 3. 创建发布者实例
  15.             var orderService = new OrderService(eventBus);
  16.             var authService = new AuthenticationService(eventBus);
  17.             // 4. 发布者执行操作并发布事件
  18.             orderService.CreateOrder(101);
  19.             Console.WriteLine("\n----------------------------\n");
  20.             authService.Login("Alice");
  21.             Console.WriteLine("\n--- 事件处理完成 ---\n");
  22.             // 模拟 EmailService 停止工作(离开 using 块会自动调用 Dispose 取消订阅)
  23.         } // emailService.Dispose() 会在这里被调用
  24.         Console.WriteLine("\n--- EmailService 已停止并取消订阅 ---\n");
  25.         // 再次创建订单,EmailService 不应再收到通知
  26.         var orderService2 = new OrderService(eventBus); // 可以复用之前的,这里新建仅为演示
  27.         orderService2.CreateOrder(102);
  28.         Console.WriteLine("\n--- 程序结束 ---");
  29.         Console.ReadKey();
  30.     }
  31. }
复制代码
输出示例:
  1. 邮件服务: 已订阅 OrderCreatedEvent.
  2. 库存服务: 已订阅 OrderCreatedEvent.
  3. 审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.
  4. --- 初始化完成, 准备发布事件 ---
  5. 订单服务: 正在创建订单 101...
  6. 订单服务: 订单 101 创建成功.
  7. 订单服务: 发布 OrderCreatedEvent 事件.
  8. 邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: 101)...
  9. 库存服务: 收到订单创建事件. 正在为订单 101 预留库存...
  10. 审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=101, 时间=...
  11. 邮件服务: 订单 101 的创建通知邮件已发送.
  12. 库存服务: 订单 101 的库存已预留.
  13. ----------------------------
  14. 认证服务: 用户 Alice 尝试登录...
  15. 认证服务: 用户 Alice 登录成功.
  16. 认证服务: 发布 UserLoggedInEvent 事件.
  17. 审计服务: 记录到事件 - 类型: UserLoggedInEvent, 内容: 用户已登录: 用户名=Alice, 时间=...
  18. --- 事件处理完成 ---
  19. 邮件服务: 已取消订阅 OrderCreatedEvent.
  20. --- EmailService 已停止并取消订阅 ---
  21. 订单服务: 正在创建订单 102...
  22. 订单服务: 订单 102 创建成功.
  23. 订单服务: 发布 OrderCreatedEvent 事件.
  24. 库存服务: 收到订单创建事件. 正在为订单 102 预留库存...
  25. 审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=102, 时间=...
  26. 库存服务: 订单 102 的库存已预留.
  27. --- 程序结束 ---
复制代码
关键点和考虑:

  • 线程安全: 上述实现使用了简单的 lock 来保护 _subscribers 字典。对于高并发场景,可能需要更精细的锁策略或使用 ConcurrentDictionary(但仍需注意其内部列表的线程安全)。在 Publish 方法中,先复制订阅者列表再在锁外调用委托,是为了减少锁的持有时间,提高并发性能,并防止在处理程序中修改订阅列表导致迭代错误。
  • 错误处理: 订阅者的处理程序中可能抛出异常。Publish 方法应该能够处理这些异常(例如,记录日志),并决定是否继续通知其他订阅者。
  • 异步处理: 如果订阅者的处理逻辑是 I/O 密集型或耗时的,应该考虑使用异步委托 (Func) 和异步发布机制,以避免阻塞发布线程。
  • 取消订阅: 非常重要,尤其是在订阅者的生命周期比 EventBus 短的情况下(例如 UI 组件、临时服务)。否则会导致内存泄漏(EventBus 持有对已不再需要的订阅者的引用)。使用 IDisposable 是一种常见的管理取消订阅的方式。确保 Unsubscribe 时传入的是 Subscribe 时使用的同一个委托实例。
  • 弱引用: 对于某些场景(如 UI 框架),如果订阅者可能被垃圾回收而没有显式取消订阅,可以使用 WeakReference 来持有委托,但这会增加实现的复杂性。
  • 依赖注入: 在实际应用中,IEventBus 通常注册为单例(Singleton)服务,并通过依赖注入(DI)容器注入到需要它的发布者和订阅者中。
  • 事件继承: 当前设计是基于精确的事件类型匹配。如果需要支持订阅基类事件或接口事件来接收所有派生类/实现类的事件,Publish 方法的逻辑需要相应调整(遍历事件类型的继承链或接口)。
这个简单的 EventBus 实现提供了一个基础框架,你可以根据具体项目的需求进行扩展和优化。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册