想用ZeroMq的发布订阅者模式,又不想写一大串switch case?
想用RPC函数代理机制,又想多对多进行通讯?
下面就结合二者的优点重新封装一套通讯模块
一、先写ZeroMq的发布订阅这模式
- 1 // 1. 初始化代理(Proxy)
- 2 var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225");
- 3 var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216");
- 4 {
- 5 _proxy = new Proxy(xSubSocket, xPubSocket);
- 6 // 2. 启动代理(异步运行)
- 7 var proxyTask = Task.Run(() => _proxy.Start());
- 8 }
复制代码 View Code
View Code二、封装一下RPC的函数代理
- 1 internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class
- 2 {
- 3 private static Transceiver _client;
- 4
- 5 private static JsonSerializerOptions _options = new JsonSerializerOptions
- 6 {
- 7 WriteIndented = true, // 让 JSON 格式更加可读
- 8 Converters = { new JsonStringEnumConverter() } // 使用字符串枚举转换器
- 9 };
- 10 internal static TInterface Create(Transceiver client)
- 11 {
- 12 object proxy = Create<TInterface, InterfaceProxy<TInterface>>();
- 13 _client = client;
- 14 return (TInterface)proxy;
- 15 }
- 16 protected override object Invoke(MethodInfo targetMethod, object[] args)
- 17 {
- 18 var message = new ProxyMessage
- 19 {
- 20 InterfaceType = typeof(TInterface).FullName,
- 21 Method = targetMethod.Name,
- 22 Parameters = args,
- 23 };
- 24 _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options));
- 25 return targetMethod.ReturnType;
- 26 }
- 27 }
复制代码 View Code
- 1 public class MethodManager
- 2 {
- 3 private readonly string[] instanceMethodsOnObject = new string[4] { "Equals", "GetHashCode", "GetType", "ToString" };
- 4
- 5 /// <summary>
- 6 /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。
- 7 /// 内部字典的键为整数,值为 Method 对象。
- 8 /// </summary>
- 9 public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase);
- 10
- 11 /// <summary>
- 12 /// 根据方法名和参数数量获取方法
- 13 /// </summary>
- 14 /// <param name="name">方法名</param>
- 15 /// <param name="paramCount">参数数量</param>
- 16 /// <returns>找到的方法对象,若未找到则返回null</returns>
- 17 public Method Get(string name, int paramCount)
- 18 {
- 19 if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2))
- 20 {
- 21 return value2;
- 22 }
- 23
- 24 if (name != "*")
- 25 {
- 26 return Get("*", 2);
- 27 }
- 28
- 29 return null;
- 30 }
- 31
- 32 /// <summary>
- 33 /// 向方法集合中添加一个方法。
- 34 /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。
- 35 /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息,
- 36 /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。
- 37 /// </summary>
- 38 public void Add(Method method)
- 39 {
- 40 if (!Methods.ContainsKey(method.Name))
- 41 {
- 42 Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>());
- 43 }
- 44
- 45 ConcurrentDictionary<int, Method> concurrentDictionary = Methods[method.Name];
- 46 ParameterInfo[] parameters = method.Parameters;
- 47 int num = parameters.Length;
- 48 int num2 = 0;
- 49 for (int i = 0; i < num; i++)
- 50 {
- 51 ParameterInfo parameterInfo = parameters[i];
- 52 if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType))
- 53 {
- 54 num2 = 1;
- 55 }
- 56 else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue)
- 57 {
- 58 concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method);
- 59 }
- 60 }
- 61
- 62 concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method);
- 63 }
- 64
- 65 /// <summary>
- 66 /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。
- 67 /// </summary>
- 68 /// <param name="methodInfo">方法的信息。</param>
- 69 /// <param name="name">方法的名称。</param>
- 70 /// <param name="target">方法的目标对象,默认为null。</param>
- 71 public void Add(MethodInfo methodInfo, string name, object target = null)
- 72 {
- 73 Add(new Method(methodInfo, name, target));
- 74 }
- 75
- 76 /// <summary>
- 77 /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。
- 78 /// </summary>
- 79 /// <param name="name">方法的名称。</param>
- 80 /// <param name="target">包含方法的对象。</param>
- 81 /// <param name="alias">方法的别名,如果为空则使用方法名称。</param>
- 82 public void AddMethod(string name, object target, string alias = "")
- 83 {
- 84 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
- 85 if (string.IsNullOrEmpty(alias))
- 86 {
- 87 alias = name;
- 88 }
- 89
- 90 MethodInfo[] array = methods;
- 91 foreach (MethodInfo methodInfo in array)
- 92 {
- 93 if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
- 94 {
- 95 Add(methodInfo, alias, target);
- 96 }
- 97 }
- 98 }
- 99
- 100 /// <summary>
- 101 /// 向目标对象添加方法。
- 102 /// </summary>
- 103 /// <param name="names">方法名称数组。</param>
- 104 /// <param name="target">目标对象,方法将添加到该对象上。</param>
- 105 /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param>
- 106 public void AddMethods(string[] names, object target, string ns = "")
- 107 {
- 108 foreach (string text in names)
- 109 {
- 110 if (string.IsNullOrEmpty(ns))
- 111 {
- 112 AddMethod(text, target, text);
- 113 }
- 114 else
- 115 {
- 116 AddMethod(text, target, ns + "_" + text);
- 117 }
- 118 }
- 119 }
- 120
- 121 /// <summary>
- 122 /// 向目标对象添加实例方法。
- 123 /// </summary>
- 124 /// <param name="target">目标对象,其实例方法将被添加。</param>
- 125 /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param>
- 126 public void AddInstanceMethods(object target, string ns = "")
- 127 {
- 128 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);
- 129 foreach (MethodInfo methodInfo in methods)
- 130 {
- 131 if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1)
- 132 {
- 133 string text = methodInfo.Name;
- 134 if (!string.IsNullOrEmpty(ns))
- 135 {
- 136 text = ns + "_" + text;
- 137 }
- 138
- 139 Add(methodInfo, text, target);
- 140 }
- 141 }
- 142 }
- 143 }
复制代码 View Code
- 1 using Communication.Nmq.Dto;
- 2 using NetMQ;
- 3 using NetMQ.Sockets;
- 4 using System;
- 5 using System.Collections.Generic;
- 6 using System.Linq;
- 7 using Communication.Utils;
- 8 using Newtonsoft.Json;
- 9
- 10 namespace Communication.Nmq
- 11 {
- 12 internal class Transceiver : IDisposable
- 13 {
- 14 private List<MqType> SubscribeTypes;
- 15 private List<MqType> UnSubscribleTypes;
- 16 private MethodManager FunListeners;
- 17 private string Owner;
- 18 private PublisherSocket ClientPub;
- 19 private SubscriberSocket ClientSub;
- 20 private NetMQPoller Poller;
- 21 private static readonly object SendLock = new();
- 22
- 23
- 24 internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener)
- 25 {
- 26 SubscribeTypes = subscribeType;
- 27 UnSubscribleTypes = unSubscribleType;
- 28 FunListeners = funListener;
- 29 Owner = owner.ToString();
- 30 ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225");
- 31 ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216");
- 32 Poller = new NetMQPoller { ClientSub };
- 33 SubTopic();
- 34 ClientSub.ReceiveReady += ClientSub_ReceiveReady;
- 35 Poller.RunAsync();
- 36 }
- 37
- 38 private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e)
- 39 {
- 40 try
- 41 {
- 42 List<string> frames = new();
- 43 if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null)
- 44 {
- 45 Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr);
- 46 return;
- 47 }
- 48 if (frames.Count == 2)
- 49 {
- 50 string topic = frames[0];
- 51 string msg = frames[1];
- 52 if (Enum.TryParse(topic, out MqType topicType))
- 53 {
- 54 if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method))
- 55 {
- 56 if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods))
- 57 {
- 58 foreach (var methodInfo in methods.Select(m => m.Value))
- 59 {
- 60 try
- 61 {
- 62 var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters[i].ParameterType)).ToArray();
- 63 methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters);
- 64 }
- 65 catch (Exception ex)
- 66 {
- 67 Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr);
- 68 return;
- 69 }
- 70 }
- 71 }
- 72 else
- 73 {
- 74 throw new InvalidOperationException("找不到对应的函数");
- 75 }
- 76 }
- 77 else
- 78 {
- 79 throw new InvalidOperationException("无法转换格式");
- 80 }
- 81 }
- 82 else
- 83 {
- 84 Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr);
- 85 }
- 86 }
- 87 else
- 88 {
- 89 Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr);
- 90 }
- 91 }
- 92 catch (Exception ex)
- 93 {
- 94 Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr);
- 95 }
- 96 }
- 97
- 98 public object SafeChangeType(object value, Type targetType)
- 99 {
- 100 if (targetType.IsEnum && value is string strValue)
- 101 {
- 102 return Enum.Parse(targetType, strValue);
- 103 }
- 104 return Convert.ChangeType(value, targetType);
- 105 }
- 106
- 107 private bool TryDeserializeProxyMessage(string json, out ProxyMessage message)
- 108 {
- 109 message = null;
- 110 try
- 111 {
- 112 message = JsonConvert.DeserializeObject<ProxyMessage>(json);
- 113 return message != null;
- 114 }
- 115 catch
- 116 {
- 117 return false;
- 118 }
- 119 }
- 120
- 121 private void SubTopic()
- 122 {
- 123 if (SubscribeTypes?.Any() == true)
- 124 {
- 125 foreach (var item in SubscribeTypes)
- 126 {
- 127 ClientSub.Subscribe(item.ToString());
- 128 }
- 129 }
- 130 if (UnSubscribleTypes?.Any() == true)
- 131 {
- 132 foreach (var item in UnSubscribleTypes)
- 133 {
- 134 ClientSub.Unsubscribe(item.ToString());
- 135 }
- 136 }
- 137 }
- 138
- 139 internal void SendMessage(string msg)
- 140 {
- 141 try
- 142 {
- 143 lock (SendLock)
- 144 {
- 145 var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg);
- 146 }
- 147 }
- 148 catch (Exception ex)
- 149 {
- 150 Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr);
- 151 }
- 152 }
- 153
- 154 public void Dispose()
- 155 {
- 156 Poller.Stop();
- 157 ClientPub?.Dispose();
- 158 ClientSub?.Dispose();
- 159 Poller.Dispose();
- 160 }
- 161 }
- 162 }
复制代码 View Code
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |