找回密码
 立即注册
首页 业界区 业界 c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新 ...

c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

陆菊 5 天前
想用ZeroMq的发布订阅者模式,又不想写一大串switch case?
想用RPC函数代理机制,又想多对多进行通讯?
下面就结合二者的优点重新封装一套通讯模块
一、先写ZeroMq的发布订阅这模式

  •  先做个代理,负责分发事件,代码如下:
1.gif
2.gif
  1. 1  // 1. 初始化代理(Proxy)
  2. 2  var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225");
  3. 3  var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216");
  4. 4  {
  5. 5       _proxy = new Proxy(xSubSocket, xPubSocket);
  6. 6        // 2. 启动代理(异步运行)
  7. 7       var proxyTask = Task.Run(() => _proxy.Start());
  8. 8   }
复制代码
View Code

  • 封装客户端代码
3.gif
4.gif
  1.   1 using Communication.Nmq.Dto;
  2.   2 using System;
  3.   3 using System.Collections.Generic;
  4.   4
  5.   5 namespace Communication.Nmq
  6.   6 {
  7.   7     public class MqClientMain
  8.   8     {
  9.   9         public readonly static MqClientMain Instance = new MqClientMain();
  10. 10         internal Transceiver _client;
  11. 11         private List<MqType> _subscriberList = new();
  12. 12         private List<MqType> _unSubscriberList = new();
  13. 13         private readonly MethodManager _methodManager = new MethodManager();
  14. 14         private bool _isListnerAll;
  15. 15         private MqType _owner;
  16. 16         protected MqClientMain() { }
  17. 17
  18. 18
  19. 19         /// <summary>
  20. 20         /// 函数反射监听(可监听多个)
  21. 21         /// </summary>
  22. 22         /// <param name="targetInstance"></param>
  23. 23         /// <param name="targgetMonitorList"></param>
  24. 24         /// <returns></returns>
  25. 25         public MqClientMain ProxyAddInstanceMethods<InterfaceType>(InterfaceType targetInstance, params MqType[] targgetMonitorList) where InterfaceType : class
  26. 26         {
  27. 27             foreach (MqType targgetMonitor in targgetMonitorList)
  28. 28             {
  29. 29                 if (!_subscriberList.Contains(targgetMonitor))
  30. 30                     _subscriberList.Add(targgetMonitor);
  31. 31             }
  32. 32
  33. 33             _methodManager.AddInstanceMethods(targetInstance);
  34. 34             return this;
  35. 35         }
  36. 36
  37. 37         
  38. 38         /// <summary>
  39. 39         ///额外增加事件(可监听多个)
  40. 40         /// </summary>
  41. 41         /// <param name="targetInstance"></param>
  42. 42         /// <param name="mathName"></param>
  43. 43         /// <param name="targgetMonitor"></param>
  44. 44         /// <returns></returns>
  45. 45         public MqClientMain ProxyAddMethods(object targetInstance, string[] mathName, params MqType[] targgetMonitorList)
  46. 46         {
  47. 47             foreach (MqType targgetMonitor in targgetMonitorList)
  48. 48             {
  49. 49                 if (!_subscriberList.Contains(targgetMonitor))
  50. 50                     _subscriberList.Add(targgetMonitor);
  51. 51             }
  52. 52             _methodManager.AddMethods(mathName, targetInstance);
  53. 53             return this;
  54. 54         }
  55. 55
  56. 56         /// <summary>
  57. 57         /// 开始通讯
  58. 58         /// </summary>
  59. 59         /// <param name="owner">注册者类型(你是谁)</param>
  60. 60         public virtual void Start(MqType owner)
  61. 61         {
  62. 62             if (_client == null)
  63. 63             {
  64. 64                 if (_isListnerAll)
  65. 65                 {
  66. 66                     //监听所有会监听到自己,所以不监听自己
  67. 67                     _subscriberList.Remove(owner);
  68. 68                 }
  69. 69                 _owner = owner;
  70. 70                 _client = new Transceiver(owner, _subscriberList, _unSubscriberList, _methodManager);
  71. 71               
  72. 72
  73. 73             }
  74. 74         }
  75. 75
  76. 76         public void Stop()
  77. 77         {
  78. 78             _client.Dispose();
  79. 79         }
  80. 80
  81. 81         /// <summary>
  82. 82         /// 发布事件
  83. 83         /// </summary>
  84. 84         /// <param name="msg"></param>
  85. 85         public void MqSendMessage(string msg)
  86. 86         {
  87. 87             if (_client != null)
  88. 88             {
  89. 89                 _client.SendMessage(msg);
  90. 90             }
  91. 91         }
  92. 92         
  93. 93         /// <summary>
  94. 94         /// 代理列表
  95. 95         /// </summary>
  96. 96         private Dictionary<Type, object> _proxyList = new();
  97. 97         /// <summary>
  98. 98         /// 获取代理
  99. 99         /// </summary>
  100. 100         /// <typeparam name="T"></typeparam>
  101. 101         /// <returns></returns>
  102. 102         public T GetInterfaceProxy<T>() where T : class
  103. 103         {
  104. 104             if (_client == null)
  105. 105                 return null;
  106. 106             if (!_proxyList.ContainsKey(typeof(T)))
  107. 107                 _proxyList.Add(typeof(T), InterfaceProxy<T>.Create(_client));
  108. 108             return (T)_proxyList[typeof(T)];
  109. 109         }
  110. 110     }
  111. 111 }
复制代码
View Code二、封装一下RPC的函数代理

  • 封装一个接口代理类
5.gif
6.gif
  1. 1  internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class
  2. 2  {
  3. 3      private static Transceiver _client;
  4. 4
  5. 5      private static JsonSerializerOptions _options = new JsonSerializerOptions
  6. 6      {
  7. 7          WriteIndented = true,  // 让 JSON 格式更加可读
  8. 8          Converters = { new JsonStringEnumConverter() }  // 使用字符串枚举转换器
  9. 9      };
  10. 10      internal static TInterface Create(Transceiver client)
  11. 11      {
  12. 12          object proxy = Create<TInterface, InterfaceProxy<TInterface>>();
  13. 13          _client = client;
  14. 14          return (TInterface)proxy;
  15. 15      }
  16. 16      protected override object Invoke(MethodInfo targetMethod, object[] args)
  17. 17      {
  18. 18          var message = new ProxyMessage
  19. 19          {
  20. 20              InterfaceType = typeof(TInterface).FullName,
  21. 21              Method = targetMethod.Name,
  22. 22              Parameters = args,
  23. 23          };
  24. 24          _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options));
  25. 25          return targetMethod.ReturnType;
  26. 26      }
  27. 27  }
复制代码
View Code

  • 复制一份RPC封装的获取类里面的所有方法
7.gif
8.gif
  1.   1  public class MethodManager
  2.   2  {
  3.   3      private readonly string[] instanceMethodsOnObject = new string[4] { "Equals", "GetHashCode", "GetType", "ToString" };
  4.   4
  5.   5      /// <summary>
  6.   6      /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。
  7.   7      /// 内部字典的键为整数,值为 Method 对象。
  8.   8      /// </summary>
  9.   9      public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase);
  10. 10
  11. 11      /// <summary>
  12. 12      /// 根据方法名和参数数量获取方法
  13. 13      /// </summary>
  14. 14      /// <param name="name">方法名</param>
  15. 15      /// <param name="paramCount">参数数量</param>
  16. 16      /// <returns>找到的方法对象,若未找到则返回null</returns>
  17. 17      public Method Get(string name, int paramCount)
  18. 18      {
  19. 19          if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2))
  20. 20          {
  21. 21              return value2;
  22. 22          }
  23. 23
  24. 24          if (name != "*")
  25. 25          {
  26. 26              return Get("*", 2);
  27. 27          }
  28. 28
  29. 29          return null;
  30. 30      }
  31. 31
  32. 32      /// <summary>
  33. 33      /// 向方法集合中添加一个方法。
  34. 34      /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。
  35. 35      /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息,
  36. 36      /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。
  37. 37      /// </summary>
  38. 38      public void Add(Method method)
  39. 39      {
  40. 40          if (!Methods.ContainsKey(method.Name))
  41. 41          {
  42. 42              Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>());
  43. 43          }
  44. 44
  45. 45          ConcurrentDictionary<int, Method> concurrentDictionary = Methods[method.Name];
  46. 46          ParameterInfo[] parameters = method.Parameters;
  47. 47          int num = parameters.Length;
  48. 48          int num2 = 0;
  49. 49          for (int i = 0; i < num; i++)
  50. 50          {
  51. 51              ParameterInfo parameterInfo = parameters[i];
  52. 52              if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType))
  53. 53              {
  54. 54                  num2 = 1;
  55. 55              }
  56. 56              else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue)
  57. 57              {
  58. 58                  concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method);
  59. 59              }
  60. 60          }
  61. 61
  62. 62          concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method);
  63. 63      }
  64. 64
  65. 65      /// <summary>
  66. 66      /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。
  67. 67      /// </summary>
  68. 68      /// <param name="methodInfo">方法的信息。</param>
  69. 69      /// <param name="name">方法的名称。</param>
  70. 70      /// <param name="target">方法的目标对象,默认为null。</param>
  71. 71      public void Add(MethodInfo methodInfo, string name, object target = null)
  72. 72      {
  73. 73          Add(new Method(methodInfo, name, target));
  74. 74      }
  75. 75        
  76. 76      /// <summary>
  77. 77      /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。
  78. 78      /// </summary>
  79. 79      /// <param name="name">方法的名称。</param>
  80. 80      /// <param name="target">包含方法的对象。</param>
  81. 81      /// <param name="alias">方法的别名,如果为空则使用方法名称。</param>
  82. 82      public void AddMethod(string name, object target, string alias = "")
  83. 83      {
  84. 84          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
  85. 85          if (string.IsNullOrEmpty(alias))
  86. 86          {
  87. 87              alias = name;
  88. 88          }
  89. 89
  90. 90          MethodInfo[] array = methods;
  91. 91          foreach (MethodInfo methodInfo in array)
  92. 92          {
  93. 93              if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
  94. 94              {
  95. 95                  Add(methodInfo, alias, target);
  96. 96              }
  97. 97          }
  98. 98      }
  99. 99
  100. 100      /// <summary>
  101. 101      /// 向目标对象添加方法。
  102. 102      /// </summary>
  103. 103      /// <param name="names">方法名称数组。</param>
  104. 104      /// <param name="target">目标对象,方法将添加到该对象上。</param>
  105. 105      /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param>
  106. 106      public void AddMethods(string[] names, object target, string ns = "")
  107. 107      {
  108. 108          foreach (string text in names)
  109. 109          {
  110. 110              if (string.IsNullOrEmpty(ns))
  111. 111              {
  112. 112                  AddMethod(text, target, text);
  113. 113              }
  114. 114              else
  115. 115              {
  116. 116                  AddMethod(text, target, ns + "_" + text);
  117. 117              }
  118. 118          }
  119. 119      }
  120. 120
  121. 121      /// <summary>
  122. 122      /// 向目标对象添加实例方法。
  123. 123      /// </summary>
  124. 124      /// <param name="target">目标对象,其实例方法将被添加。</param>
  125. 125      /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param>
  126. 126      public void AddInstanceMethods(object target, string ns = "")
  127. 127      {
  128. 128          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
  129. 129          foreach (MethodInfo methodInfo in methods)
  130. 130          {
  131. 131              if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1)
  132. 132              {
  133. 133                  string text = methodInfo.Name;
  134. 134                  if (!string.IsNullOrEmpty(ns))
  135. 135                  {
  136. 136                      text = ns + "_" + text;
  137. 137                  }
  138. 138
  139. 139                  Add(methodInfo, text, target);
  140. 140              }
  141. 141          }
  142. 142      }
  143. 143  }
复制代码
View Code

  • 通过反射执行方法
9.gif
10.gif
  1.   1 using Communication.Nmq.Dto;
  2.   2 using NetMQ;
  3.   3 using NetMQ.Sockets;
  4.   4 using System;
  5.   5 using System.Collections.Generic;
  6.   6 using System.Linq;
  7.   7 using Communication.Utils;
  8.   8 using Newtonsoft.Json;
  9.   9
  10. 10 namespace Communication.Nmq
  11. 11 {
  12. 12     internal class Transceiver : IDisposable
  13. 13     {
  14. 14         private List<MqType> SubscribeTypes;
  15. 15         private List<MqType> UnSubscribleTypes;
  16. 16         private MethodManager FunListeners;
  17. 17         private string Owner;
  18. 18         private PublisherSocket ClientPub;
  19. 19         private SubscriberSocket ClientSub;
  20. 20         private NetMQPoller Poller;
  21. 21         private static readonly object SendLock = new();
  22. 22
  23. 23   
  24. 24         internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener)
  25. 25         {
  26. 26             SubscribeTypes = subscribeType;
  27. 27             UnSubscribleTypes = unSubscribleType;
  28. 28             FunListeners = funListener;
  29. 29             Owner = owner.ToString();
  30. 30             ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225");
  31. 31             ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216");
  32. 32             Poller = new NetMQPoller { ClientSub };
  33. 33             SubTopic();
  34. 34             ClientSub.ReceiveReady += ClientSub_ReceiveReady;
  35. 35             Poller.RunAsync();
  36. 36         }
  37. 37
  38. 38         private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e)
  39. 39         {
  40. 40             try
  41. 41             {
  42. 42                 List<string> frames = new();
  43. 43                 if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null)
  44. 44                 {
  45. 45                     Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr);
  46. 46                     return;
  47. 47                 }
  48. 48                 if (frames.Count == 2)
  49. 49                 {
  50. 50                     string topic = frames[0];
  51. 51                     string msg = frames[1];
  52. 52                     if (Enum.TryParse(topic, out MqType topicType))
  53. 53                     {
  54. 54                         if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method))
  55. 55                         {
  56. 56                             if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods))
  57. 57                             {
  58. 58                                 foreach (var methodInfo in methods.Select(m => m.Value))
  59. 59                                 {
  60. 60                                     try
  61. 61                                     {
  62. 62                                         var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters[i].ParameterType)).ToArray();
  63. 63                                         methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters);
  64. 64                                     }
  65. 65                                     catch (Exception ex)
  66. 66                                     {
  67. 67                                         Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr);
  68. 68                                         return;
  69. 69                                     }
  70. 70                                 }
  71. 71                             }
  72. 72                             else
  73. 73                             {
  74. 74                                 throw new InvalidOperationException("找不到对应的函数");
  75. 75                             }
  76. 76                         }
  77. 77                         else
  78. 78                         {
  79. 79                             throw new InvalidOperationException("无法转换格式");
  80. 80                         }
  81. 81                     }
  82. 82                     else
  83. 83                     {
  84. 84                         Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr);
  85. 85                     }
  86. 86                 }
  87. 87                 else
  88. 88                 {
  89. 89                     Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr);
  90. 90                 }
  91. 91             }
  92. 92             catch (Exception ex)
  93. 93             {
  94. 94                 Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr);
  95. 95             }
  96. 96         }
  97. 97
  98. 98         public object SafeChangeType(object value, Type targetType)
  99. 99         {
  100. 100             if (targetType.IsEnum && value is string strValue)
  101. 101             {
  102. 102                 return Enum.Parse(targetType, strValue);
  103. 103             }
  104. 104             return Convert.ChangeType(value, targetType);
  105. 105         }
  106. 106
  107. 107         private bool TryDeserializeProxyMessage(string json, out ProxyMessage message)
  108. 108         {
  109. 109             message = null;
  110. 110             try
  111. 111             {
  112. 112                 message = JsonConvert.DeserializeObject<ProxyMessage>(json);
  113. 113                 return message != null;
  114. 114             }
  115. 115             catch
  116. 116             {
  117. 117                 return false;
  118. 118             }
  119. 119         }
  120. 120
  121. 121         private void SubTopic()
  122. 122         {
  123. 123             if (SubscribeTypes?.Any() == true)
  124. 124             {
  125. 125                 foreach (var item in SubscribeTypes)
  126. 126                 {
  127. 127                     ClientSub.Subscribe(item.ToString());
  128. 128                 }
  129. 129             }
  130. 130             if (UnSubscribleTypes?.Any() == true)
  131. 131             {
  132. 132                 foreach (var item in UnSubscribleTypes)
  133. 133                 {
  134. 134                     ClientSub.Unsubscribe(item.ToString());
  135. 135                 }
  136. 136             }
  137. 137         }
  138. 138
  139. 139         internal void SendMessage(string msg)
  140. 140         {
  141. 141             try
  142. 142             {
  143. 143                 lock (SendLock)
  144. 144                 {
  145. 145                     var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg);
  146. 146                 }
  147. 147             }
  148. 148             catch (Exception ex)
  149. 149             {
  150. 150                 Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr);
  151. 151             }
  152. 152         }
  153. 153
  154. 154         public void Dispose()
  155. 155         {
  156. 156             Poller.Stop();
  157. 157             ClientPub?.Dispose();
  158. 158             ClientSub?.Dispose();
  159. 159             Poller.Dispose();
  160. 160         }
  161. 161     }
  162. 162 }
复制代码
View Code 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册