找回密码
 立即注册
首页 业界区 业界 Nacos源码—3.Nacos集群高可用分析一

Nacos源码—3.Nacos集群高可用分析一

亢安芙 2025-6-2 23:08:44
大纲
1.Nacos集群的几个问题
2.单节点对服务进行心跳健康检查和同步检查结果
3.集群新增服务实例时如何同步给其他节点
4.集群节点的健康状态变动时的数据同步
5.集群新增节点时如何同步已有服务实例数据
 
1.Nacos集群的几个问题
问题一:在单机模式下,Nacos服务端会开启心跳健康检查的定时任务。那么在集群模式下,是否有必要让全部集群节点都执行这个定时任务?
 
问题二:Nacos服务端通过心跳健康检查的定时任务感知服务实例健康状态改变时,如何把服务实例的健康状态同步给其他Nacos集群节点?
 
问题三:一个新服务实例发起注册请求,只会有一个Nacos集群节点处理对应请求,那么处理完注册请求后,集群节点间应该如何同步服务实例数据?
 
问题四:假设Nacos集群有三个节点,现在需要新增了一个节点,那么新增的节点应该如何从集群中同步已存在的服务实例数据?
 
问题五:Nacos集群节点相互之间,是否有心跳机制来检测集群节点是否可用?
 
2.单节点对服务进行心跳健康检查和同步检查结果
(1)集群对服务进行心跳健康检查的设计
(2)选择一个节点对服务进行心跳健康检查的源码
(3)集群之间同步服务的健康状态的源码
(4)总结
 
(1)集群对服务进行心跳健康检查的架构设计
假设Nacos集群有三个节点:现已知单机模式下的Nacos服务端是会开启心跳健康检查的定时任务的。既然集群节点有三个,是否每个节点都要执行心跳健康检查的定时任务?
 
方案一:三个节点全都去执行心跳健康检查任务。如果每个节点执行的结果都不同,那么以哪个为准?
 
方案二:只有一个节点去执行心跳健康检查任务,然后把检查结果同步给其他节点。
 
明显方案二逻辑简洁清晰,而Nacos集群也选择了方案二。在Nacos集群模式下,三个节点都会开启一个心跳健康检查的定时任务,但只有一个节点会真正地执行心跳健康检查的逻辑。然后在检查完成后,会开启一个定时任务将检查结果同步给其他节点。
 
(2)选择一个节点对服务进行心跳健康检查的源码
对服务进行心跳健康检查的任务,其实就是ClientBeatCheckTask任务。Nacos服务端在处理服务实例注册接口请求时,就会开启这个任务。如下所示:
1.png
ClientBeatCheckTask这个类是一个线程任务。在ClientBeatCheckTask的run()方法中,一开始就有两个if判断。第一个if判断:判断当前节点在集群模式下是否需要对该Service执行心跳健康检查任务。第二个if判断:是否开启了健康检查任务,默认是开启的。注意:ClientBeatProcessor用于处理服务实例的心跳,服务实例和服务都需要心跳健康检查。
 
在集群模式下,为了保证只有一个节点对该Service执行心跳健康检查,就需要第一个if判断中的DistroMapper的responsible()方法来实现了。通过DistroMapper的responsible()方法可知:只会有一个集群节点能够对该Service执行心跳健康检查。而其他的集群节点,并不会去执行对该Service的心跳健康检查。
  1. //Check and update statues of ephemeral instances, remove them if they have been expired.
  2. public class ClientBeatCheckTask implements Runnable {
  3.     private Service service;//每个ClientBeatCheckTask都会对应一个Service
  4.     ...
  5.    
  6.     @JsonIgnore
  7.     public DistroMapper getDistroMapper() {
  8.         return ApplicationUtils.getBean(DistroMapper.class);
  9.     }
  10.    
  11.     @Override
  12.     public void run() {
  13.         try {
  14.             //第一个if判断:DistroMapper.responsible()方法
  15.             //判断当前节点在集群模式下是否需要对该Service执行心跳健康检查任务
  16.             if (!getDistroMapper().responsible(service.getName())) {
  17.                 return;
  18.             }
  19.             //第二个if判断:
  20.             //是否开启了健康检查任务,默认是开启的
  21.             if (!getSwitchDomain().isHealthCheckEnabled()) {
  22.                 return;
  23.             }
  24.             List<Instance> instances = service.allIPs(true);
  25.         
  26.             //first set health status of instances:
  27.             for (Instance instance : instances) {
  28.                 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
  29.                     if (!instance.isMarked()) {
  30.                         if (instance.isHealthy()) {
  31.                             instance.setHealthy(false);
  32.                             getPushService().serviceChanged(service);
  33.                             ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
  34.                         }
  35.                     }
  36.                 }
  37.             }
  38.         
  39.             if (!getGlobalConfig().isExpireInstance()) {
  40.                 return;
  41.             }
  42.         
  43.             //then remove obsolete instances:
  44.             for (Instance instance : instances) {
  45.                 if (instance.isMarked()) {
  46.                     continue;
  47.                 }
  48.                 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
  49.                     //delete instance
  50.                     deleteIp(instance);
  51.                 }
  52.             }
  53.         } catch (Exception e) {
  54.             Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
  55.         }
  56.     }
  57.     ...
  58. }
  59. //Distro mapper, judge which server response input service.
  60. @Component("distroMapper")
  61. public class DistroMapper extends MemberChangeListener {
  62.     //List of service nodes, you must ensure that the order of healthyList is the same for all nodes.
  63.     private volatile List<String> healthyList = new ArrayList<>();
  64.    
  65.     //init server list.
  66.     @PostConstruct
  67.     public void init() {
  68.         NotifyCenter.registerSubscriber(this);//注册订阅者
  69.         this.healthyList = MemberUtil.simpleMembers(memberManager.allMembers());
  70.     }
  71.     ...
  72.     //Judge whether current server is responsible for input service.
  73.     public boolean responsible(String serviceName) {
  74.         //获取集群节点数量,这里假设的是三个集群节点
  75.         final List<String> servers = healthyList;
  76.         //如果采用单机模式启动,直接返回true
  77.         if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
  78.             return true;
  79.         }
  80.         //如果没有可用的健康集群节点,直接返回false
  81.         if (CollectionUtils.isEmpty(servers)) {
  82.             //means distro config is not ready yet
  83.             return false;
  84.         }
  85.         int index = servers.indexOf(EnvUtil.getLocalAddress());
  86.         int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
  87.         if (lastIndex < 0 || index < 0) {
  88.             return true;
  89.         }
  90.         //对serviceName进行Hash操作,然后对servers.size()取模,得到负责执行心跳健康检查任务的那个节点索引
  91.         int target = distroHash(serviceName) % servers.size();
  92.         return target >= index && target <= lastIndex;
  93.     }
  94.    
  95.     private int distroHash(String serviceName) {
  96.         return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
  97.     }
  98.     ...
  99. }
复制代码
三.第一个异步任务ServiceReporter
首先从内存注册表中,获取全部的服务名称。ServiceManager的getAllServiceNames()方法返回的是一个Map对象。其中的key是对应的命名空间ID,value是对应命名空间下的全部服务名称。然后遍历allServiceNames中的内容,此时会有两个for循环来处理。最后这个任务执行完,会继续提交一个延时执行的任务进行健康检查。
 
第一个for循环:遍历某命名空间ID下的全部服务名称,封装请求参数。
首先采用同样的Hash算法,判断遍历到的Service是否需要同步健康结果。如果需要执行,则把参数放到ServiceChecksum对象中。然后通过JacksonUtils转成JSON数据后,再放到Message请求参数对象。
 
第二个for循环:遍历集群节点,发送请求给其他节点进行数据同步。
首先判断是否是自身节点,如果是则跳过。否则调用ServiceStatusSynchronizer的send()方法。通过向其他集群节点的接口发起请求,来实现心跳健康检查结果的同步。集群节点同步的核心方法就在ServiceStatusSynchronizer的send()方法中。
 
通过ServiceStatusSynchronizer的send()方法中的代码可知,最终会通过HTTP方式进行数据同步,请求地址是"v1/ns/service/status"。该请求地址对应的请求处理入口是ServiceController的serviceStatus()方法。
 
在ServiceController的serviceStatus()方法中,如果通过对比入参和注册表的ServiceChecksum后,发现服务状态发生了改变,那么就会调用ServiceManager.addUpdatedServiceToQueue()方法。
 
在addUpdatedServiceToQueue()方法中,首先会把传入的参数包装成ServiceKey对象,然后放入到toBeUpdatedServicesQueue阻塞队列中。
 
既然最后会将ServiceKey对象放入到阻塞队列中,那必然有一个异步任务,从阻塞队列中获取ServiceKey对象进行处理。这个处理逻辑和处理服务实例注册时,将Pair对象放入阻塞队列一样,而这个异步任务便是ServiceManager的init()方法的第二个异步任务。
  1. //Core manager storing all services in Nacos.
  2. @Component
  3. public class ServiceManager implements RecordListener<Service> {
  4.     ...
  5.     //Init service maneger.
  6.     @PostConstruct
  7.     public void init() {
  8.         //用来发起 同步心跳健康检查结果请求 的异步任务
  9.         GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
  10.         //用来处理 同步心跳健康检查结果请求 的异步任务:内存队列削峰 + 异步任务提速
  11.         GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
  12.    
  13.         if (emptyServiceAutoClean) {
  14.             Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms", cleanEmptyServiceDelay, cleanEmptyServicePeriod);
  15.         
  16.             //delay 60s, period 20s;
  17.             //This task is not recommended to be performed frequently in order to avoid
  18.             //the possibility that the service cache information may just be deleted
  19.             //and then created due to the heartbeat mechanism
  20.             GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay, cleanEmptyServicePeriod);
  21.         }
  22.         try {
  23.             Loggers.SRV_LOG.info("listen for service meta change");
  24.             consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
  25.         } catch (NacosException e) {
  26.             Loggers.SRV_LOG.error("listen for service meta change failed!");
  27.         }
  28.     }
  29.     ...
  30. }
  31. public class GlobalExecutor {
  32.     private static final ScheduledExecutorService SERVICE_SYNCHRONIZATION_EXECUTOR =
  33.         ExecutorFactory.Managed.newSingleScheduledExecutorService(
  34.             ClassUtils.getCanonicalName(NamingApp.class),
  35.             new NameThreadFactory("com.alibaba.nacos.naming.service.worker")
  36.         );
  37.    
  38.     public static final ScheduledExecutorService SERVICE_UPDATE_MANAGER_EXECUTOR =
  39.         ExecutorFactory.Managed.newSingleScheduledExecutorService(
  40.             ClassUtils.getCanonicalName(NamingApp.class),
  41.             new NameThreadFactory("com.alibaba.nacos.naming.service.update.processor")
  42.         );
  43.     ...
  44.     public static void scheduleServiceReporter(Runnable command, long delay, TimeUnit unit) {
  45.         //在指定的延迟后执行某项任务
  46.         SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(command, delay, unit);
  47.     }
  48.    
  49.     public static void submitServiceUpdateManager(Runnable runnable) {
  50.         //向线程池提交任务,让线程池执行任务
  51.         SERVICE_UPDATE_MANAGER_EXECUTOR.submit(runnable);
  52.     }
  53.     ...
  54. }
  55. public final class ExecutorFactory {
  56.     ...
  57.     public static final class Managed {
  58.         private static final String DEFAULT_NAMESPACE = "nacos";
  59.         private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();
  60.         ...
  61.         //Create a new single scheduled executor service with input thread factory and register to manager.
  62.         public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, final ThreadFactory threadFactory) {
  63.             ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
  64.             THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
  65.             return executorService;
  66.         }
  67.         ...
  68.     }
  69. }
  70. //线程池管理器
  71. public final class ThreadPoolManager {
  72.     private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
  73.     private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);
  74.     private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
  75.     private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
  76.    
  77.     static {
  78.         INSTANCE.init();
  79.         //JVM关闭时添加勾子,释放线程资源
  80.         ThreadUtils.addShutdownHook(new Thread(new Runnable() {
  81.             @Override
  82.             public void run() {
  83.                 LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool");
  84.                 //关闭线程池管理器
  85.                 shutdown();
  86.                 LOGGER.warn("[ThreadPoolManager] Destruction of the end");
  87.             }
  88.         }));
  89.     }
  90.    
  91.     public static ThreadPoolManager getInstance() {
  92.         return INSTANCE;
  93.     }
  94.    
  95.     private ThreadPoolManager() {
  96.     }
  97.    
  98.     private void init() {
  99.         resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8);
  100.     }
  101.    
  102.     //Register the thread pool resources with the resource manager.
  103.     public void register(String namespace, String group, ExecutorService executor) {
  104.         if (!resourcesManager.containsKey(namespace)) {
  105.             synchronized (this) {
  106.                 lockers.put(namespace, new Object());
  107.             }
  108.         }
  109.         final Object monitor = lockers.get(namespace);
  110.         synchronized (monitor) {
  111.             Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
  112.             if (map == null) {
  113.                 map = new HashMap<String, Set<ExecutorService>>(8);
  114.                 map.put(group, new HashSet<ExecutorService>());
  115.                 map.get(group).add(executor);
  116.                 resourcesManager.put(namespace, map);
  117.                 return;
  118.             }
  119.             if (!map.containsKey(group)) {
  120.                 map.put(group, new HashSet<ExecutorService>());
  121.             }
  122.             map.get(group).add(executor);
  123.         }
  124.     }
  125.    
  126.     //Shutdown thread pool manager. 关闭线程池管理器
  127.     public static void shutdown() {
  128.         if (!CLOSED.compareAndSet(false, true)) {
  129.             return;
  130.         }
  131.         Set<String> namespaces = INSTANCE.resourcesManager.keySet();
  132.         for (String namespace : namespaces) {
  133.             //销毁所有线程池资源
  134.             INSTANCE.destroy(namespace);
  135.         }
  136.     }
  137.    
  138.     //Destroys all thread pool resources under this namespace.
  139.     public void destroy(final String namespace) {
  140.         final Object monitor = lockers.get(namespace);
  141.         if (monitor == null) {
  142.             return;
  143.         }
  144.         synchronized (monitor) {
  145.             Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace);
  146.             if (subResource == null) {
  147.                 return;
  148.             }
  149.             for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) {
  150.                 for (ExecutorService executor : entry.getValue()) {
  151.                     //关闭线程池
  152.                     ThreadUtils.shutdownThreadPool(executor);
  153.                 }
  154.             }
  155.             resourcesManager.get(namespace).clear();
  156.             resourcesManager.remove(namespace);
  157.         }
  158.     }
  159.     ...
  160. }
  161. public final class ThreadUtils {
  162.     ...
  163.     public static void addShutdownHook(Runnable runnable) {
  164.         Runtime.getRuntime().addShutdownHook(new Thread(runnable));
  165.     }
  166.    
  167.     public static void shutdownThreadPool(ExecutorService executor) {
  168.         shutdownThreadPool(executor, null);
  169.     }
  170.    
  171.     //Shutdown thread pool.
  172.     public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
  173.         executor.shutdown();
  174.         int retry = 3;
  175.         while (retry > 0) {
  176.             retry--;
  177.             try {
  178.                 if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
  179.                     return;
  180.                 }
  181.             } catch (InterruptedException e) {
  182.                 executor.shutdownNow();
  183.                 Thread.interrupted();
  184.             } catch (Throwable ex) {
  185.                 if (logger != null) {
  186.                     logger.error("ThreadPoolManager shutdown executor has error : {}", ex);
  187.                 }
  188.             }
  189.         }
  190.         executor.shutdownNow();
  191.     }
  192.     ...
  193. }
复制代码
(3)集群节点收到健康检查请求后的数据同步源码
集群节点收到某集群节点发来的"/v1/core/cluster/report"请求后,会调用NacosClusterController的report()方法来处理请求。在report()方法中,会把发起请求的来源节点状态直接设置成UP状态,然后调用ServerMemberManager的update()方法来更新来源节点属性。在update()方法中,会把存放在serverList中对应的节点Member进行更新,也就是通过MemberUtil的copy()方法覆盖老对象的属性来实现更新。
 
注意:因为serverList属性在集群中的每个节点都存在一份,所以节点收到健康检查请求后,要对其serverList属性中的节点进行更新。
  1. //Core manager storing all services in Nacos.
  2. @Component
  3. public class ServiceManager implements RecordListener<Service> {
  4.     //Map(namespace, Map(group::serviceName, Service)).
  5.     private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  6.     private final DistroMapper distroMapper;
  7.     private final Synchronizer synchronizer = new ServiceStatusSynchronizer();
  8.     ...
  9.     public Map<String, Set<String>> getAllServiceNames() {
  10.         Map<String, Set<String>> namesMap = new HashMap<>(16);
  11.         for (String namespaceId : serviceMap.keySet()) {
  12.             namesMap.put(namespaceId, serviceMap.get(namespaceId).keySet());
  13.         }
  14.         return namesMap;
  15.     }
  16.    
  17.     private class ServiceReporter implements Runnable {
  18.         @Override
  19.         public void run() {
  20.             try {
  21.                 //获取内存注册表下的所有服务名称,按命名空间分类
  22.                 Map<String, Set<String>> allServiceNames = getAllServiceNames();
  23.                 if (allServiceNames.size() <= 0) {
  24.                     //ignore
  25.                     return;
  26.                 }
  27.                 //遍历allServiceNames中的内容
  28.                 //也就是遍历每一个命名空间,然后封装请求参数,接着发送请求来同步心跳健康检查结果
  29.                 for (String namespaceId : allServiceNames.keySet()) {
  30.                     ServiceChecksum checksum = new ServiceChecksum(namespaceId);
  31.                     //第一个循环:封装请求参数
  32.                     for (String serviceName : allServiceNames.get(namespaceId)) {
  33.                         //采用同样的算法,确保当前的集群节点,只对自己负责的那些Service,同步心跳健康检查结果
  34.                         if (!distroMapper.responsible(serviceName)) {
  35.                             continue;
  36.                         }
  37.                         Service service = getService(namespaceId, serviceName);
  38.                         if (service == null || service.isEmpty()) {
  39.                             continue;
  40.                         }
  41.                         service.recalculateChecksum();
  42.                         //添加请求参数
  43.                         checksum.addItem(serviceName, service.getChecksum());
  44.                     }
  45.                     //创建请求参数对象Message,准备进行同步
  46.                     Message msg = new Message();
  47.                     //对请求对象进行JSON序列化
  48.                     msg.setData(JacksonUtils.toJson(checksum));
  49.                     Collection<Member> sameSiteServers = memberManager.allMembers();
  50.                     if (sameSiteServers == null || sameSiteServers.size() <= 0) {
  51.                         return;
  52.                     }
  53.                   
  54.                     //第二个循环:遍历所有集群节点,发送请求给其他节点进行数据同步
  55.                     for (Member server : sameSiteServers) {
  56.                         //判断地址是否是本节点,如果是则直接跳过
  57.                         if (server.getAddress().equals(NetUtils.localServer())) {
  58.                             continue;
  59.                         }
  60.                         //同步其他集群节点
  61.                         synchronizer.send(server.getAddress(), msg);
  62.                     }
  63.                 }
  64.             } catch (Exception e) {
  65.                 Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
  66.             } finally {
  67.                 //继续提交一个延时执行的任务
  68.                 GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);
  69.             }
  70.         }
  71.     }
  72.     ...
  73. }
  74. public class ServiceStatusSynchronizer implements Synchronizer {
  75.     @Override
  76.     public void send(final String serverIP, Message msg) {
  77.         if (serverIP == null) {
  78.             return;
  79.         }
  80.         //构建请求参数
  81.         Map<String, String> params = new HashMap<String, String>(10);
  82.         params.put("statuses", msg.getData());
  83.         params.put("clientIP", NetUtils.localServer());
  84.         //拼接url地址
  85.         String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
  86.         if (IPUtil.containsPort(serverIP)) {
  87.             url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
  88.         }
  89.       
  90.         try {
  91.             //异步发送HTTP请求,url地址就是:http://ip/v1/ns/service/status, 用来同步心跳健康检查结果
  92.             HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
  93.                 @Override
  94.                 public void onReceive(RestResult<String> result) {
  95.                     if (!result.ok()) {
  96.                         Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);
  97.                     }
  98.                 }
  99.                
  100.                 @Override
  101.                 public void onError(Throwable throwable) {
  102.                     Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);
  103.                 }
  104.                
  105.                 @Override
  106.                 public void onCancel() {
  107.                 }
  108.             });
  109.         } catch (Exception e) {
  110.             Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
  111.         }
  112.     }
  113.     ...
  114. }
  115. //Service operation controller.
  116. @RestController
  117. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service")
  118. public class ServiceController {
  119.     @Autowired
  120.     protected ServiceManager serviceManager;
  121.     ...
  122.     //Check service status whether latest.
  123.     @PostMapping("/status")
  124.     public String serviceStatus(HttpServletRequest request) throws Exception {
  125.         String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
  126.         String value = URLDecoder.decode(entity, "UTF-8");
  127.         JsonNode json = JacksonUtils.toObj(value);
  128.         String statuses = json.get("statuses").asText();
  129.         String serverIp = json.get("clientIP").asText();
  130.         if (!memberManager.hasMember(serverIp)) {
  131.             throw new NacosException(NacosException.INVALID_PARAM, "ip: " + serverIp + " is not in serverlist");
  132.         }
  133.    
  134.         try {
  135.             ServiceManager.ServiceChecksum checksums = JacksonUtils.toObj(statuses, ServiceManager.ServiceChecksum.class);
  136.             if (checksums == null) {
  137.                 Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: null");
  138.                 return "fail";
  139.             }
  140.         
  141.             for (Map.Entry<String, String> entry : checksums.serviceName2Checksum.entrySet()) {
  142.                 if (entry == null || StringUtils.isEmpty(entry.getKey()) || StringUtils.isEmpty(entry.getValue())) {
  143.                     continue;
  144.                 }
  145.                 String serviceName = entry.getKey();
  146.                 String checksum = entry.getValue();
  147.                 Service service = serviceManager.getService(checksums.namespaceId, serviceName);
  148.                 if (service == null) {
  149.                     continue;
  150.                 }
  151.                 service.recalculateChecksum();
  152.                 //通过对比入参和注册表的checksum,如果发现服务状态有变动
  153.                 if (!checksum.equals(service.getChecksum())) {
  154.                     if (Loggers.SRV_LOG.isDebugEnabled()) {
  155.                         Loggers.SRV_LOG.debug("checksum of {} is not consistent, remote: {}, checksum: {}, local: {}", serviceName, serverIp, checksum, service.getChecksum());
  156.                     }
  157.                     //添加到阻塞队列
  158.                     serviceManager.addUpdatedServiceToQueue(checksums.namespaceId, serviceName, serverIp, checksum);
  159.                 }
  160.             }
  161.         } catch (Exception e) {
  162.             Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: " + statuses, e);
  163.         }
  164.         return "ok";
  165.     }
  166.     ...
  167. }
  168. //Core manager storing all services in Nacos.
  169. @Component
  170. public class ServiceManager implements RecordListener<Service> {
  171.     private final Lock lock = new ReentrantLock();
  172.     //阻塞队列
  173.     private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
  174.     ...
  175.     //Add a service into queue to update.
  176.     public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {
  177.         lock.lock();
  178.         try {
  179.             //包装成ServiceKey对象,放入到toBeUpdatedServicesQueue阻塞队列中
  180.             toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
  181.         } catch (Exception e) {
  182.             toBeUpdatedServicesQueue.poll();
  183.             toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
  184.             Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);
  185.         } finally {
  186.             lock.unlock();
  187.         }
  188.     }
  189.     ...
  190. }
复制代码
(4)总结
在Nacos集群架构下,集群节点间的健康状态如何进行同步。简单来说,集群节点间是会相互进行通信的。如果通信失败,那么就会把通信节点的状态属性修改为DOWN。
2.png
 
5.集群新增节点时如何同步已有服务实例数据
(1)节点启动时加载全部服务实例数据的异步任务
(2)节点处理获取全部服务实例数据请求的源码
(3)总结
 
(1)节点启动时加载服务实例数据的异步任务
Nacos服务端会有一个DistroProtocol类,它是一个Bean对象,在Spring项目启动时会创建这个DistroProtocol类型的Bean。
 
创建DistroProtocol类型的Bean时,会执行DistroProtocol的构造方法,从而调用DistroProtocol的startLoadTask()方法开启一个加载数据的异步任务。
 
在DistroProtocol的startLoadTask()方法中,会提交一个异步任务,并且会通过传入一个回调方法来标志是否已初始化成功。其中提交的任务类型是DistroLoadDataTask,所以会执行DistroLoadDataTask的run()方法,接着会执行DistroLoadDataTask的load()方法,然后执行该任务类的loadAllDataSnapshotFromRemote()方法,从而获取其他集群节点上的全部服务实例数据并更新本地注册表。
 
在loadAllDataSnapshotFromRemote()方法中,首先会遍历除自身节点外的其他集群节点。然后调用DistroHttpAgent的getDatumSnapshot()方法,通过HTTP请求"/v1/ns/distro/datums"获取目标节点的全部服务实例数据。接着再调用DistroConsistencyServiceImpl的processSnapshot()方法,将获取到的全部服务实例数据写入到本地注册表中。其中只要有一个集群节点数据同步成功,那么这个方法就结束。否则就继续遍历下一个集群节点,获取全部服务实例数据然后同步本地。
 
Nacos服务端在处理服务实例注册时,采用的是内存队列 + 异步任务。异步任务会调用listener的onChange()方法利用写时复制来更新本地注册表。而processSnapshot()方法也会调用listener的onChange()方法来更新注册表,其中listener的onChange()方法对应的实现其实就是Service的onChange()方法。
  1. //Core manager storing all services in Nacos.
  2. @Component
  3. public class ServiceManager implements RecordListener<Service> {
  4.     //阻塞队列
  5.     private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
  6.     ...
  7.     private class UpdatedServiceProcessor implements Runnable {
  8.         //get changed service from other server asynchronously
  9.         @Override
  10.         public void run() {
  11.             ServiceKey serviceKey = null;
  12.             try {
  13.                 //无限循环
  14.                 while (true) {
  15.                     try {
  16.                         //从阻塞队列中获取任务
  17.                         serviceKey = toBeUpdatedServicesQueue.take();
  18.                     } catch (Exception e) {
  19.                         Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
  20.                     }
  21.                     if (serviceKey == null) {
  22.                         continue;
  23.                     }
  24.                     GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
  25.                 }
  26.             } catch (Exception e) {
  27.                 Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
  28.             }
  29.         }
  30.     }
  31.     private class ServiceUpdater implements Runnable {
  32.         String namespaceId;
  33.         String serviceName;
  34.         String serverIP;
  35.         
  36.         public ServiceUpdater(ServiceKey serviceKey) {
  37.             this.namespaceId = serviceKey.getNamespaceId();
  38.             this.serviceName = serviceKey.getServiceName();
  39.             this.serverIP = serviceKey.getServerIP();
  40.         }
  41.         
  42.         @Override
  43.         public void run() {
  44.             try {
  45.                 //修改服务实例的健康状态
  46.                 updatedHealthStatus(namespaceId, serviceName, serverIP);
  47.             } catch (Exception e) {
  48.                 Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e);
  49.             }
  50.         }
  51.     }
  52.    
  53.     //Update health status of instance in service. 修改服务实例的健康状态
  54.     public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
  55.         Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
  56.         //解析参数
  57.         JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
  58.    
  59.         ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
  60.         Map<String, String> ipsMap = new HashMap<>(ipList.size());
  61.         for (int i = 0; i < ipList.size(); i++) {
  62.             String ip = ipList.get(i).asText();
  63.             String[] strings = ip.split("_");
  64.             ipsMap.put(strings[0], strings[1]);
  65.         }
  66.    
  67.         Service service = getService(namespaceId, serviceName);
  68.         if (service == null) {
  69.             return;
  70.         }
  71.       
  72.         //是否改变标识
  73.         boolean changed = false;
  74.         //获取全部的实例数据,进行遍历
  75.         List<Instance> instances = service.allIPs();
  76.         for (Instance instance : instances) {
  77.             //同步健康状态结果
  78.             boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
  79.             if (valid != instance.isHealthy()) {
  80.                 changed = true;
  81.                 //更新服务实例的健康状态
  82.                 instance.setHealthy(valid);
  83.                 Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(), instance.getClusterName());
  84.             }
  85.         }
  86.         //如果服务实例健康状态改变了,那么就发布"服务改变事件",使用UDP方式通知客户端
  87.         if (changed) {
  88.             pushService.serviceChanged(service);
  89.             if (Loggers.EVT_LOG.isDebugEnabled()) {
  90.                 StringBuilder stringBuilder = new StringBuilder();
  91.                 List<Instance> allIps = service.allIPs();
  92.                 for (Instance instance : allIps) {
  93.                     stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
  94.                 }
  95.                 Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(), service.getName(), stringBuilder.toString());
  96.             }
  97.         }
  98.     }
  99.     ...
  100. }
复制代码
总结:Nacos服务端集群节点启动时,会创建一个DistroProtocol类型的Bean对象,在这个DistroProtocol类型的Bean对象的构造方法会开启一个异步任务。该异步任务的主要逻辑是通过HTTP方式从其他集群节点获取服务数据,然后把获取到的服务实例数据更新到本地的内存注册表,完成数据同步。而且只要成功从某一个集群节点完成数据同步,那整个任务逻辑就结束。
 
此外,向某个集群节点获取全部服务实例数据时,是向"/v1/ns/distro/datums"接口发起HTTP请求来进行获取的。
 
(2)节点处理获取全部服务实例数据请求的源码
Nacos集群节点收到"/v1/ns/distro/datums"的HTTP请求后,便会执行DistroController的getAllDatums()方法。也就是调用DistroProtocol的onSnapshot()方法获取数据,然后直接返回。接着会调用DistroDataStorageImpl的getDatumSnapshot()方法。
 
getDatumSnapshot()方法会从DataStore的getDataMap()方法获取结果。进行服务实例注册时,会把服务实例信息存一份放在DataStore的Map中。进行服务实例同步时,也会把服务实例信息存放到DataStore的Map中。所以在DataStore里,会包含整个服务实例信息的数据。这里获取全部服务实例数据的接口,也是利用DataStore来实现的,而不是从内存注册表中获取。
  1. @DependsOn("ProtocolManager")
  2. @org.springframework.stereotype.Service("distroConsistencyService")
  3. public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
  4.     private final DistroProtocol distroProtocol;
  5.     ...
  6.     @Override
  7.     public void put(String key, Record value) throws NacosException {
  8.         //把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中,
  9.         //并添加异步任务来实现将最新的服务实例列表更新到内存注册表
  10.         onPut(key, value);
  11.         //在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步
  12.         distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
  13.     }
  14.     ...
  15. }
  16. @Component
  17. public class DistroProtocol {
  18.     private final ServerMemberManager memberManager;
  19.     private final DistroTaskEngineHolder distroTaskEngineHolder;
  20.     ...
  21.     //Start to sync data to all remote server.
  22.     public void sync(DistroKey distroKey, DataOperation action, long delay) {
  23.         //遍历除自身以外的其他集群节点
  24.         for (Member each : memberManager.allMembersWithoutSelf()) {
  25.             //包装第一层
  26.             DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());
  27.             //包装第二层
  28.             DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
  29.             //实际调用的是NacosDelayTaskExecuteEngine.addTask()方法添加任务
  30.             distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
  31.             if (Loggers.DISTRO.isDebugEnabled()) {
  32.                 Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
  33.             }
  34.         }
  35.     }
  36.     ...
  37. }
  38. public class DistroKey {
  39.     private String resourceKey;
  40.     private String resourceType;   
  41.     private String targetServer;   
  42.    
  43.     public DistroKey() {
  44.     }
  45.    
  46.     public DistroKey(String resourceKey, String resourceType, String targetServer) {
  47.         this.resourceKey = resourceKey;
  48.         this.resourceType = resourceType;
  49.         this.targetServer = targetServer;
  50.     }
  51.     ...
  52. }
  53. //Distro delay task.
  54. public class DistroDelayTask extends AbstractDelayTask {
  55.     private final DistroKey distroKey;
  56.     private DataOperation action;
  57.     private long createTime;
  58.    
  59.     public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
  60.         this.distroKey = distroKey;
  61.         this.action = action;
  62.         this.createTime = System.currentTimeMillis();
  63.         setLastProcessTime(createTime);
  64.         setTaskInterval(delayTime);
  65.     }
  66.     ...
  67. }
  68. //Abstract task which can delay and merge.
  69. public abstract class AbstractDelayTask implements NacosTask {
  70.     //Task time interval between twice processing, unit is millisecond.
  71.     private long taskInterval;
  72.     //The time which was processed at last time, unit is millisecond.
  73.     private long lastProcessTime;
  74.    
  75.     public void setTaskInterval(long interval) {
  76.         this.taskInterval = interval;
  77.     }
  78.    
  79.     public void setLastProcessTime(long lastProcessTime) {
  80.         this.lastProcessTime = lastProcessTime;
  81.     }
  82.     ...
  83. }
  84. //Distro task engine holder.
  85. @Component
  86. public class DistroTaskEngineHolder {
  87.     private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
  88.    
  89.     public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
  90.         return delayTaskExecuteEngine;
  91.     }
  92.     ...
  93. }
  94. public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
  95.     public DistroDelayTaskExecuteEngine() {
  96.         super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);
  97.     }
  98.     ...
  99. }
  100. //Nacos delay task execute engine.
  101. public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {
  102.     private final ScheduledExecutorService processingExecutor;
  103.     protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池
  104.     protected final ReentrantLock lock = new ReentrantLock();
  105.     ...
  106.     public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
  107.         super(logger);
  108.         tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
  109.         processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
  110.         //开启延时任务
  111.         processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
  112.     }
  113.    
  114.     @Override
  115.     public void addTask(Object key, AbstractDelayTask newTask) {
  116.         lock.lock();
  117.         try {
  118.             AbstractDelayTask existTask = tasks.get(key);
  119.             if (null != existTask) {
  120.                 newTask.merge(existTask);
  121.             }
  122.             //最后放入到ConcurrentHashMap中
  123.             tasks.put(key, newTask);
  124.         } finally {
  125.             lock.unlock();
  126.         }
  127.     }
  128.     ...
  129.     private class ProcessRunnable implements Runnable {
  130.         @Override
  131.         public void run() {
  132.             try {
  133.                 processTasks();
  134.             } catch (Throwable e) {
  135.                 getEngineLog().error(e.toString(), e);
  136.             }
  137.         }
  138.     }
  139.     ...
  140.     //process tasks in execute engine.
  141.     protected void processTasks() {
  142.         //获取tasks中所有的任务,然后进行遍历
  143.         Collection<Object> keys = getAllTaskKeys();
  144.         for (Object taskKey : keys) {
  145.             //通过任务key,获取具体的任务,并且从任务池中移除掉
  146.             AbstractDelayTask task = removeTask(taskKey);
  147.             if (null == task) {
  148.                 continue;
  149.             }
  150.             //根据taskKey获取NacosTaskProcessor延迟任务处理器:DistroDelayTaskProcessor
  151.             NacosTaskProcessor processor = getProcessor(taskKey);
  152.             if (null == processor) {
  153.                 getEngineLog().error("processor not found for task, so discarded. " + task);
  154.                 continue;
  155.             }
  156.             try {
  157.                 //ReAdd task if process failed
  158.                 //调用DistroDelayTaskProcessor.process()方法,把task同步任务放入到第二层内存队列中
  159.                 if (!processor.process(task)) {
  160.                     //如果失败了,会重试添加task回tasks这个map中
  161.                     retryFailedTask(taskKey, task);
  162.                 }
  163.             } catch (Throwable e) {
  164.                 getEngineLog().error("Nacos task execute error : " + e.toString(), e);
  165.                 retryFailedTask(taskKey, task);
  166.             }
  167.         }
  168.     }
  169.    
  170.     @Override
  171.     public AbstractDelayTask removeTask(Object key) {
  172.         lock.lock();
  173.         try {
  174.             AbstractDelayTask task = tasks.get(key);
  175.             if (null != task && task.shouldProcess()) {
  176.                 return tasks.remove(key);
  177.             } else {
  178.                 return null;
  179.             }
  180.         } finally {
  181.             lock.unlock();
  182.         }
  183.     }
  184. }
复制代码
注意:DataStore数据最后还是存到内存的。通过使用DataStore,可以实现以下功能和好处:
 
一.数据持久化
DataStore可将节点数据持久化到磁盘或其他介质,以确保数据的持久性。这样即使系统重启或发生故障,节点数据也能够得到恢复和保留。毕竟Datum的key是ServiceName、value是Instance实例列表,而Instance实例中又会包含所属的ClusterName、IP和Port,所以根据DataStore可以恢复完整的内存注册表。
  1. //Distro delay task processor.
  2. public class DistroDelayTaskProcessor implements NacosTaskProcessor {
  3.     private final DistroTaskEngineHolder distroTaskEngineHolder;
  4.     private final DistroComponentHolder distroComponentHolder;
  5.    
  6.     public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder, DistroComponentHolder distroComponentHolder) {
  7.         this.distroTaskEngineHolder = distroTaskEngineHolder;
  8.         this.distroComponentHolder = distroComponentHolder;
  9.     }
  10.    
  11.     @Override
  12.     public boolean process(NacosTask task) {
  13.         if (!(task instanceof DistroDelayTask)) {
  14.             return true;
  15.         }
  16.         //将NacosTask任务对象转换为DistroDelayTask任务对象
  17.         DistroDelayTask distroDelayTask = (DistroDelayTask) task;
  18.         DistroKey distroKey = distroDelayTask.getDistroKey();
  19.         if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
  20.             //包装成一个DistroSyncChangeTask对象
  21.             DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
  22.             //调用NacosExecuteTaskExecuteEngine.addTask()方法添加到队列中去
  23.             distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
  24.             return true;
  25.         }
  26.         return false;
  27.     }
  28. }
  29. //Nacos execute task execute engine.
  30. public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {
  31.     private final TaskExecuteWorker[] executeWorkers;
  32.    
  33.     public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
  34.         super(logger);
  35.         //TaskExecuteWorker在初始化时会启动一个线程处理其队列中的任务
  36.         executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
  37.         for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
  38.             executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
  39.         }
  40.     }
  41.     ...
  42.     @Override
  43.     public void addTask(Object tag, AbstractExecuteTask task) {
  44.         //根据tag获取到TaskExecuteWorker
  45.         NacosTaskProcessor processor = getProcessor(tag);
  46.         if (null != processor) {
  47.             processor.process(task);
  48.             return;
  49.         }
  50.         TaskExecuteWorker worker = getWorker(tag);
  51.         //调用TaskExecuteWorker.process()方法把DistroSyncChangeTask任务放入到队列当中去
  52.         worker.process(task);
  53.     }
  54.    
  55.     private TaskExecuteWorker getWorker(Object tag) {
  56.         int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
  57.         return executeWorkers[idx];
  58.     }
  59.     ...
  60. }
  61. //Nacos execute task execute worker.
  62. public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
  63.     //任务存储容器
  64.     private final BlockingQueue<Runnable> queue;
  65.    
  66.     public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
  67.         this.name = name + "_" + mod + "%" + total;
  68.         this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
  69.         this.closed = new AtomicBoolean(false);
  70.         this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
  71.         new InnerWorker(name).start();
  72.     }
  73.     ...
  74.     @Override
  75.     public boolean process(NacosTask task) {
  76.         if (task instanceof AbstractExecuteTask) {
  77.             //把DistroSyncChangeTask任务放入到队列中
  78.             putTask((Runnable) task);
  79.         }
  80.         return true;
  81.     }
  82.    
  83.     private void putTask(Runnable task) {
  84.         try {
  85.             //把DistroSyncChangeTask任务放入到队列中
  86.             queue.put(task);
  87.         } catch (InterruptedException ire) {
  88.             log.error(ire.toString(), ire);
  89.         }
  90.     }
  91.     ...
  92.     //Inner execute worker.
  93.     private class InnerWorker extends Thread {
  94.         InnerWorker(String name) {
  95.             setDaemon(false);
  96.             setName(name);
  97.         }
  98.    
  99.         @Override
  100.         public void run() {
  101.             while (!closed.get()) {
  102.                 try {
  103.                     //一直取队列中的任务,这里的task任务类型是:DistroSyncChangeTask
  104.                     Runnable task = queue.take();
  105.                     long begin = System.currentTimeMillis();
  106.                     //调用DistroSyncChangeTask中的run方法
  107.                     task.run();
  108.                     long duration = System.currentTimeMillis() - begin;
  109.                     if (duration > 1000L) {
  110.                         log.warn("distro task {} takes {}ms", task, duration);
  111.                     }
  112.                 } catch (Throwable e) {
  113.                     log.error("[DISTRO-FAILED] " + e.toString(), e);
  114.                 }
  115.             }
  116.         }
  117.     }
  118. }
复制代码
二.数据同步
DataStore可以协调和同步节点数据的访问和更新。当多个节点同时注册或更新数据时,DataStore可确保数据的一致性和正确性,避免数据冲突和不一致的情况。
 
三.数据管理
DataStore提供了对节点数据的管理功能,包括增加、更新、删除等操作。通过使用适当的数据结构和算法,可以高效地管理大量的节点数据,并支持快速的数据访问和查询。
 
四.数据访问控制
DataStore可以实现对节点数据的访问控制和权限管理,只有具有相应权限的节点或用户才能访问和修改特定的节点数据,提高数据的安全性和保密性。
 
DataStore在Nacos中充当了节点数据的中央存储和管理器。通过提供持久化 + 同步 + 管理 + 访问控制等功能,确保节点数据的可靠性 + 一致性 + 安全性,是实现节点数据存储和操作的核心组件之一。
 
(3)总结
Nacos集群架构下新增一个集群节点时,新节点会如何进行服务数据同步:
 
首先利用了DistroProtocol类的Bean对象的构造方法开启异步任务,通过HTTP方式去请求其他集群节点的全部数据。
 
当新节点获取全部数据后,会调用Service的onChange()方法,然后利用写时复制机制更新本地内存注册表。
 
Nacos集群节点在处理获取全部服务实例数据的请求时,并不是从内存注册表中获取的,而是通过DataStore来获取。
3.png
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册