陆菊 发表于 2025-7-22 13:53:40

c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

想用ZeroMq的发布订阅者模式,又不想写一大串switch case?
想用RPC函数代理机制,又想多对多进行通讯?
下面就结合二者的优点重新封装一套通讯模块
一、先写ZeroMq的发布订阅这模式

[*] 先做个代理,负责分发事件,代码如下:
1// 1. 初始化代理(Proxy)
2var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225");
3var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216");
4{
5       _proxy = new Proxy(xSubSocket, xPubSocket);
6      // 2. 启动代理(异步运行)
7       var proxyTask = Task.Run(() => _proxy.Start());
8   }View Code

[*]封装客户端代码
1 using Communication.Nmq.Dto;
2 using System;
3 using System.Collections.Generic;
4
5 namespace Communication.Nmq
6 {
7   public class MqClientMain
8   {
9         public readonly static MqClientMain Instance = new MqClientMain();
10         internal Transceiver _client;
11         private List<MqType> _subscriberList = new();
12         private List<MqType> _unSubscriberList = new();
13         private readonly MethodManager _methodManager = new MethodManager();
14         private bool _isListnerAll;
15         private MqType _owner;
16         protected MqClientMain() { }
17
18
19         /// <summary>
20         /// 函数反射监听(可监听多个)
21         /// </summary>
22         /// <param name="targetInstance"></param>
23         /// <param name="targgetMonitorList"></param>
24         /// <returns></returns>
25         public MqClientMain ProxyAddInstanceMethods<InterfaceType>(InterfaceType targetInstance, params MqType[] targgetMonitorList) where InterfaceType : class
26         {
27             foreach (MqType targgetMonitor in targgetMonitorList)
28             {
29               if (!_subscriberList.Contains(targgetMonitor))
30                     _subscriberList.Add(targgetMonitor);
31             }
32
33             _methodManager.AddInstanceMethods(targetInstance);
34             return this;
35         }
36
37         
38         /// <summary>
39         ///额外增加事件(可监听多个)
40         /// </summary>
41         /// <param name="targetInstance"></param>
42         /// <param name="mathName"></param>
43         /// <param name="targgetMonitor"></param>
44         /// <returns></returns>
45         public MqClientMain ProxyAddMethods(object targetInstance, string[] mathName, params MqType[] targgetMonitorList)
46         {
47             foreach (MqType targgetMonitor in targgetMonitorList)
48             {
49               if (!_subscriberList.Contains(targgetMonitor))
50                     _subscriberList.Add(targgetMonitor);
51             }
52             _methodManager.AddMethods(mathName, targetInstance);
53             return this;
54         }
55
56         /// <summary>
57         /// 开始通讯
58         /// </summary>
59         /// <param name="owner">注册者类型(你是谁)</param>
60         public virtual void Start(MqType owner)
61         {
62             if (_client == null)
63             {
64               if (_isListnerAll)
65               {
66                     //监听所有会监听到自己,所以不监听自己
67                     _subscriberList.Remove(owner);
68               }
69               _owner = owner;
70               _client = new Transceiver(owner, _subscriberList, _unSubscriberList, _methodManager);
71               
72
73             }
74         }
75
76         public void Stop()
77         {
78             _client.Dispose();
79         }
80
81         /// <summary>
82         /// 发布事件
83         /// </summary>
84         /// <param name="msg"></param>
85         public void MqSendMessage(string msg)
86         {
87             if (_client != null)
88             {
89               _client.SendMessage(msg);
90             }
91         }
92         
93         /// <summary>
94         /// 代理列表
95         /// </summary>
96         private Dictionary<Type, object> _proxyList = new();
97         /// <summary>
98         /// 获取代理
99         /// </summary>
100         /// <typeparam name="T"></typeparam>
101         /// <returns></returns>
102         public T GetInterfaceProxy<T>() where T : class
103         {
104             if (_client == null)
105               return null;
106             if (!_proxyList.ContainsKey(typeof(T)))
107               _proxyList.Add(typeof(T), InterfaceProxy<T>.Create(_client));
108             return (T)_proxyList;
109         }
110   }
111 }View Code二、封装一下RPC的函数代理

[*]封装一个接口代理类
1internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class
2{
3      private static Transceiver _client;
4
5      private static JsonSerializerOptions _options = new JsonSerializerOptions
6      {
7          WriteIndented = true,// 让 JSON 格式更加可读
8          Converters = { new JsonStringEnumConverter() }// 使用字符串枚举转换器
9      };
10      internal static TInterface Create(Transceiver client)
11      {
12          object proxy = Create<TInterface, InterfaceProxy<TInterface>>();
13          _client = client;
14          return (TInterface)proxy;
15      }
16      protected override object Invoke(MethodInfo targetMethod, object[] args)
17      {
18          var message = new ProxyMessage
19          {
20            InterfaceType = typeof(TInterface).FullName,
21            Method = targetMethod.Name,
22            Parameters = args,
23          };
24          _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options));
25          return targetMethod.ReturnType;
26      }
27}View Code

[*]复制一份RPC封装的获取类里面的所有方法
1public class MethodManager
2{
3      private readonly string[] instanceMethodsOnObject = new string { "Equals", "GetHashCode", "GetType", "ToString" };
4
5      /// <summary>
6      /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。
7      /// 内部字典的键为整数,值为 Method 对象。
8      /// </summary>
9      public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase);
10
11      /// <summary>
12      /// 根据方法名和参数数量获取方法
13      /// </summary>
14      /// <param name="name">方法名</param>
15      /// <param name="paramCount">参数数量</param>
16      /// <returns>找到的方法对象,若未找到则返回null</returns>
17      public Method Get(string name, int paramCount)
18      {
19          if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2))
20          {
21            return value2;
22          }
23
24          if (name != "*")
25          {
26            return Get("*", 2);
27          }
28
29          return null;
30      }
31
32      /// <summary>
33      /// 向方法集合中添加一个方法。
34      /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。
35      /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息,
36      /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。
37      /// </summary>
38      public void Add(Method method)
39      {
40          if (!Methods.ContainsKey(method.Name))
41          {
42            Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>());
43          }
44
45          ConcurrentDictionary<int, Method> concurrentDictionary = Methods;
46          ParameterInfo[] parameters = method.Parameters;
47          int num = parameters.Length;
48          int num2 = 0;
49          for (int i = 0; i < num; i++)
50          {
51            ParameterInfo parameterInfo = parameters;
52            if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType))
53            {
54                  num2 = 1;
55            }
56            else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue)
57            {
58                  concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method);
59            }
60          }
61
62          concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method);
63      }
64
65      /// <summary>
66      /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。
67      /// </summary>
68      /// <param name="methodInfo">方法的信息。</param>
69      /// <param name="name">方法的名称。</param>
70      /// <param name="target">方法的目标对象,默认为null。</param>
71      public void Add(MethodInfo methodInfo, string name, object target = null)
72      {
73          Add(new Method(methodInfo, name, target));
74      }
75      
76      /// <summary>
77      /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。
78      /// </summary>
79      /// <param name="name">方法的名称。</param>
80      /// <param name="target">包含方法的对象。</param>
81      /// <param name="alias">方法的别名,如果为空则使用方法名称。</param>
82      public void AddMethod(string name, object target, string alias = "")
83      {
84          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
85          if (string.IsNullOrEmpty(alias))
86          {
87            alias = name;
88          }
89
90          MethodInfo[] array = methods;
91          foreach (MethodInfo methodInfo in array)
92          {
93            if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
94            {
95                  Add(methodInfo, alias, target);
96            }
97          }
98      }
99
100      /// <summary>
101      /// 向目标对象添加方法。
102      /// </summary>
103      /// <param name="names">方法名称数组。</param>
104      /// <param name="target">目标对象,方法将添加到该对象上。</param>
105      /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param>
106      public void AddMethods(string[] names, object target, string ns = "")
107      {
108          foreach (string text in names)
109          {
110            if (string.IsNullOrEmpty(ns))
111            {
112                  AddMethod(text, target, text);
113            }
114            else
115            {
116                  AddMethod(text, target, ns + "_" + text);
117            }
118          }
119      }
120
121      /// <summary>
122      /// 向目标对象添加实例方法。
123      /// </summary>
124      /// <param name="target">目标对象,其实例方法将被添加。</param>
125      /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param>
126      public void AddInstanceMethods(object target, string ns = "")
127      {
128          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
129          foreach (MethodInfo methodInfo in methods)
130          {
131            if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1)
132            {
133                  string text = methodInfo.Name;
134                  if (!string.IsNullOrEmpty(ns))
135                  {
136                      text = ns + "_" + text;
137                  }
138
139                  Add(methodInfo, text, target);
140            }
141          }
142      }
143}View Code

[*]通过反射执行方法
1 using Communication.Nmq.Dto;
2 using NetMQ;
3 using NetMQ.Sockets;
4 using System;
5 using System.Collections.Generic;
6 using System.Linq;
7 using Communication.Utils;
8 using Newtonsoft.Json;
9
10 namespace Communication.Nmq
11 {
12   internal class Transceiver : IDisposable
13   {
14         private List<MqType> SubscribeTypes;
15         private List<MqType> UnSubscribleTypes;
16         private MethodManager FunListeners;
17         private string Owner;
18         private PublisherSocket ClientPub;
19         private SubscriberSocket ClientSub;
20         private NetMQPoller Poller;
21         private static readonly object SendLock = new();
22
23   
24         internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener)
25         {
26             SubscribeTypes = subscribeType;
27             UnSubscribleTypes = unSubscribleType;
28             FunListeners = funListener;
29             Owner = owner.ToString();
30             ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225");
31             ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216");
32             Poller = new NetMQPoller { ClientSub };
33             SubTopic();
34             ClientSub.ReceiveReady += ClientSub_ReceiveReady;
35             Poller.RunAsync();
36         }
37
38         private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e)
39         {
40             try
41             {
42               List<string> frames = new();
43               if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null)
44               {
45                     Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr);
46                     return;
47               }
48               if (frames.Count == 2)
49               {
50                     string topic = frames;
51                     string msg = frames;
52                     if (Enum.TryParse(topic, out MqType topicType))
53                     {
54                         if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method))
55                         {
56                           if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods))
57                           {
58                                 foreach (var methodInfo in methods.Select(m => m.Value))
59                                 {
60                                     try
61                                     {
62                                       var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters.ParameterType)).ToArray();
63                                       methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters);
64                                     }
65                                     catch (Exception ex)
66                                     {
67                                       Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr);
68                                       return;
69                                     }
70                                 }
71                           }
72                           else
73                           {
74                                 throw new InvalidOperationException("找不到对应的函数");
75                           }
76                         }
77                         else
78                         {
79                           throw new InvalidOperationException("无法转换格式");
80                         }
81                     }
82                     else
83                     {
84                         Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr);
85                     }
86               }
87               else
88               {
89                     Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr);
90               }
91             }
92             catch (Exception ex)
93             {
94               Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr);
95             }
96         }
97
98         public object SafeChangeType(object value, Type targetType)
99         {
100             if (targetType.IsEnum && value is string strValue)
101             {
102               return Enum.Parse(targetType, strValue);
103             }
104             return Convert.ChangeType(value, targetType);
105         }
106
107         private bool TryDeserializeProxyMessage(string json, out ProxyMessage message)
108         {
109             message = null;
110             try
111             {
112               message = JsonConvert.DeserializeObject<ProxyMessage>(json);
113               return message != null;
114             }
115             catch
116             {
117               return false;
118             }
119         }
120
121         private void SubTopic()
122         {
123             if (SubscribeTypes?.Any() == true)
124             {
125               foreach (var item in SubscribeTypes)
126               {
127                     ClientSub.Subscribe(item.ToString());
128               }
129             }
130             if (UnSubscribleTypes?.Any() == true)
131             {
132               foreach (var item in UnSubscribleTypes)
133               {
134                     ClientSub.Unsubscribe(item.ToString());
135               }
136             }
137         }
138
139         internal void SendMessage(string msg)
140         {
141             try
142             {
143               lock (SendLock)
144               {
145                     var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg);
146               }
147             }
148             catch (Exception ex)
149             {
150               Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr);
151             }
152         }
153
154         public void Dispose()
155         {
156             Poller.Stop();
157             ClientPub?.Dispose();
158             ClientSub?.Dispose();
159             Poller.Dispose();
160         }
161   }
162 }View Code 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯