零、本文纲要
一、源码准备
- 二、了解服务注册-客户端
1、Nacos的服务注册表结构
2、查看Nacos的服务注册源码
3、跟踪Nacos的服务注册流程
4、客户端注册的流程图
- 三、了解服务注册-服务端
1、确定模块
2、跟踪Nacos接收处理服务注册源码
3、服务端注册的流程图四、Nacos服务注册部分总结
tips:Ctrl + F快速定位所需内容阅读吧。
一、源码准备
1、下载Nacos源码
官方下载连接:Release 1.4.2 (Apr 29th, 2021) · alibaba/nacos · GitHub。
下载Nacos源码.png
2、解压导入源码
导入IDEA,此处省略步骤。
3、proto编译
Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化,需要先将proto文件编译为对应的Java代码。
proto编译.png
下载protoc:Releases · protocolbuffers/protobuf · GitHub。
image.png
复制目录.png
配置系统变量.png
配置环境变量.png
4、编译proto
① 进入目标目录nacos-1.4.2\consistency\src\main
进入目标目录.png
编译consistency.proto到java目录,如下:
protoc --java_out=./java ./proto/consistency.proto
编译Data.proto到java目录,如下:
protoc --java_out=./java ./proto/Data.proto
5、启动Nacos配置
Nacos控制台启动类.png
启动Nacos配置.png
6、测试
访问控制台.png
二、了解服务注册-客户端
1、Nacos的服务注册表结构
① 环境隔离:namespace② 服务分组:group③ 服务集群:service cluster④ 服务实例:service instance
image.png
2、查看Nacos的服务注册源码
服务注册与Nacos的依赖有关,所以查看spring-cloud-starter-alibaba-nacos-discovery依赖,如下:
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
② 在依赖中查看自动装配文件spring.factories
查看自动装配文件.png
定位Nacos服务注册自动配置类.png
④ 查看NacosServiceRegistryAutoConfiguration类源码
image.png
3、跟踪Nacos的服务注册流程
由上述内容我们可以知道,Nacos服务自动注册是从NacosServiceRegistryAutoConfiguration类开始的,自动注册涉及到NacosAutoServiceRegistration类。
① NacosAutoServiceRegistration类
NacosServiceRegistryAutoConfiguration类最后返回了new出来的NacosAutoServiceRegistration类对象,所以我们继续跟踪该类的构造方法,如下:
NacosAutoServiceRegistration类.png
该类构造方法中初始化其父类,所以我们继续跟踪父类。
② AbstractAutoServiceRegistration类
该类实现了ApplicationListener接口,监听Spring容器启动过程中的WebServerInitializedEvent事件,如下:
AbstractAutoServiceRegistration类.png
在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法,如下:
监听到WebServerInitializedEvent事件.png
AbstractAutoServiceRegistration#bind(row91-102)方法,如下:
@Deprecatedpublic void bind(WebServerInitializedEvent event) { // 获取 ApplicationContext 对象 ApplicationContext context = event.getApplicationContext(); // 判断服务的 Namespace,一般为 null if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } // 记录当前 web 服务的端口 this.port.compareAndSet(0, event.getWebServer().getPort()); // 启动当前服务注册流程 this.start();}
AbstractAutoServiceRegistration#start(row125-147)方法,如下:
public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below // 当前服务处于未运行状态时,才进行初始化 if (!this.running.get()) { // 发布服务开始注册的事件 this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); // 【关键】开始服务注册 register(); if (shouldRegisterManagement()) { registerManagement(); } // 发布注册完成事件 this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); // 服务状态设置为运行状态,基于AtomicBoolean#compareAndSet(row98-102) this.running.compareAndSet(false, true); }}
AbstractAutoServiceRegistration#register(row238-240)方法,如下:
ServiceRegistry的register方法.png
NacosServiceRegistry实现类.png
NacosServiceRegistry#register(row59-89)方法,如下:
@Overridepublic void register(Registration registration) { // 判断 ServiceId 是否为空,spring.applicaion.name 不能为空 if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } // 获取 Nacos 的命名服务,就是注册中心服务 NamingService namingService = namingService(); // 获取 serviceId、group String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); // 封装服务实例,包含:Ip、Port、Weight、ClusterName、Ephemeral等 Instance instance = getNacosInstanceFromRegistration(registration); try { // 开始注册服务 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } }}
可以看到方法中最终是调用NamingService的registerInstance方法实现注册的,如下:
NamingService的registerInstance方法.png
NamingService的实现类NacosNamingService,如下:
image.png
NacosNamingService#registerInstance(row204-213)方法,如下:
@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 检查超时参数是否异常,心跳超时时间(默认15秒)必须大于心跳周期(默认5秒) NamingUtils.checkInstanceIsLegal(instance); // 拼接得到新的服务名,格式:groupName@@serviceName String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 判断是否为临时实例,默认为 true if (instance.isEphemeral()) { // 是临时实例,需要定时向 Nacos 服务发送心跳 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 【关键】发送注册服务实例的请求 serverProxy.registerService(groupedServiceName, groupName, instance);}
可以看到registerService最后是由NamingProxy的实现的,如下:
registerService.png
补充:com.alibaba.nacos.api.common.Constants(row167-171),如下:
// 心跳超时时间,15spublic static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);// IP删除超时时间,30spublic static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);// 心跳周期,5spublic static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
NamingProxy#registerService(row220-248)方法,如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); // 组织请求参数 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // 通过 POST 请求,将上述参数发送到:/nacos/v1/ns/instance reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:
Ⅰ NAMESPACE_ID:环境;
Ⅱ SERVICE_NAME:服务名称;
Ⅲ GROUP_NAME:分组名称;
Ⅳ CLUSTER_NAME:集群名称;
Ⅴ ip:当前实例的IP地址;
Ⅵ port:当前实例的端口。
补充:com.alibaba.nacos.client.naming.utils.UtilAndComs(row30-34),如下:
public static String webContext = "/nacos";public static String nacosUrlBase = webContext + "/v1/ns";public static String nacosUrlInstance = nacosUrlBase + "/instance";
4、客户端注册的流程图
客户端注册的流程图.png
三、了解服务注册-服务端
经过以上了解,我们知道最后客户端注册服务实例是通过 POST 请求,将注册参数发送到:/nacos/v1/ns/instance。因此,我们从对应接收此请求的Controller开始。
1、确定模块
我们启动Nacos服务会使用Nacos-concle模块的启动类,该模块中引用的nacos-naming模块就是我们服务注册相关的模块。
nacos-naming模块.png
可以看到InstanceController类的请求路由即是我们POST请求的路由的部分,如下:
image.png
因此,我们从InstanceController开始研究接收请求处理服务注册的源码。
2、跟踪Nacos接收处理服务注册源码
InstanceController#register方法,如下:
@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception { // 从 request 获取 namespaceId,没有则为默认 public final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 获取服务名称 serviceName = "group@@serviceName" final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 把 request 中的参数封装为 Instance 对象 final Instance instance = parseInstance(request); // 【关键】注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok";}
此处会进入ServiceManager的注册方法,如下:
注册实例.png
Ⅰ ServiceManager#serviceMap属性:Map(namespace, Map(group::serviceName, Service)),里面注册着各个服务实例,如下:
Nacos服务注册表.png
Ⅱ ServiceManager#registerInstance方法,如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 如果是第一次,则创建空的服务,放入注册表 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 从注册表中拿到 service Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 【关键】添加实例到 service 当中 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}
Ⅲ ServiceManager#addInstance方法,如下:
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 给当前服务生成一个唯一标识,可以理解为 serviceId // 临时:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName // 永久:com.alibaba.nacos.naming.iplist. + namespaceId + ## + serviceName String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 从注册表中拿到 service Service service = getService(namespaceId, serviceName); // 以 service 为锁对象,同一个服务的多个实例,只能串行来完成注册(不能并发修改) synchronized (service) { // 【重点】拷贝注册表中 旧的实例列表,在此结合新注册的实例,得到最终的实例列表 COPY ON WRITE List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); // 封装实例列表到 Instances 对象中 Instances instances = new Instances(); instances.setInstanceList(instanceList); // 更新注册表(更新本地注册表、数据同步给 Nacos 集群中的其他节点) consistencyService.put(key, instances); }}
该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:
1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);2)然后将更新后的数据封装到Instances对象中,后面更新注册表时使用3)最后,调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。
注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
COPY ON WRITE:在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。
【A、更新服务列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);】
整个过程:
a、获取旧的实例列表,对比新的与旧的;
b、添加新的实例,旧的实例同步id;
c、返回最新的实例列表。
具体源码如下:
ServiceManager#addIpAddresses方法,如下:
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}
ServiceManager#updateIpAddresses方法,如下:
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { // 从 DataStore 中获取实例列表,可以理解为 Nacos 集群同步来的实例列表 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); // 从本地注册表中,获取实例列表 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); // 封装本地注册表中实例列表 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } // 合并与拷贝,旧实例列表 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { // 如果集群同步列表中有数据,则将本地注册列表和 datum 中的列表做合并 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } // 遍历新实例列表 for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { // 尝试获取与当前实例ip、端口一致的旧实例 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { // 如果存在,则把旧的 instanceId 赋值作为新的 instanceId instance.setInstanceId(oldInstance.getInstanceId()); } else { // 如果不存在,证明是一个全新实例,则生成id instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } // 返回实例列表 return new ArrayList<>(instanceMap.values());}
【B、Nacos集群一致性consistencyService.put(key, instances);】
Nacos集群一致性.png
ServiceManager#consistencyService属性,如下:
consistencyService属性.png
可以看到,此处的put方法正是采用DelegateConsistencyServiceImpl的put方法。
DelegateConsistencyServiceImpl#put方法,如下:
@Overridepublic void put(String key, Record value) throws NacosException { // 根据实例是否是临时实例,判断委托对象 mapConsistencyService(key).put(key, value);}
DelegateConsistencyServiceImpl#mapConsistencyService方法,如下:
private ConsistencyService mapConsistencyService(String key) { // 判断是否是临时实例: // 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类 // 否,选择 persistentConsistencyService,也就是 PersistentConsistencyServiceDelegateImpl return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}
默认情况下,所有实例都是临时实例,下面则关注DistroConsistencyServiceImpl类。
③ DistroConsistencyServiceImpl类
DistroConsistencyServiceImpl#put方法,如下:
@Overridepublic void put(String key, Record value) throws NacosException { // 异步,更新本地注册表 onPut(key, value); // 异步,将数据同步给 Nacos 集群中的其他节点 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}
Ⅰ onPut(key, value);
key:ServiceManager#addInstance方法中的String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);,临时:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName;
value:ServiceManager#addInstance方法中的instances.setInstanceList(instanceList);封装了实例列表的Instances对象。
Ⅱ distroProtocol.sync(...)
是通过Distro协议将数据同步给集群中的其它Nacos节点。
【A、更新本地实例列表】
DistroConsistencyServiceImpl#onPut方法,如下:
public void onPut(String key, Record value) { // 判断是否是临时实例 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 把实例列表封装到 Datum Datum<Instances> datum = new Datum<>(); // value 是服务中的实例列表 Instances datum.value = (Instances) value; // key 是 serviceId datum.key = key; datum.timestamp.incrementAndGet(); // 以 serviceId 为 key,Datum 为 value 缓存起来 dataStore.put(key, datum); } // if (!listeners.containsKey(key)) { return; } // 【重点】把 serviceId 和当前操作类型存入 notifier notifier.addTask(key, DataOperation.CHANGE);}
此处我们可以看到更新本地列表的操作最后交由notifier对象完成,notifier对象是DistroConsistencyServiceImpl的内部类实例,如下:
Notifier内部类.png
a、将变更事件放入阻塞队列
该对象内部维护了一个阻塞队列,存放服务列表变更的事件,DistroConsistencyServiceImpl#Notifier#tasks属性,如下:
阻塞队列属性.png
DistroConsistencyServiceImpl#Notifier#addTask方法,如下:
public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // 把 serviceId 和事件放入阻塞队列 tasks.offer(Pair.with(datumKey, action));}
b、异步更新
DistroConsistencyServiceImpl#init方法,如下:
// 一个bean的初始化过程中,方法执行先后顺序为 Constructor > @Autowired > @PostConstruct@PostConstruct // 在依赖加载后,对象使用前执行,而且只执行一次public void init() { // 利用线程池执行 notifier // public class Notifier implements Runnable{...} GlobalExecutor.submitDistroNotifyTask(notifier);}
单线程线程池.png
可以看到Notifier是通过一个单线程的线程池,来不断从阻塞队列中获取任务,执行服务列表的更新。
DistroConsistencyServiceImpl#Notifier#run方法,如下:
@Overridepublic void run() { Loggers.DISTRO.info("distro notifier started"); // 死循环 for (; ; ) { try { // 从阻塞队列中获取任务 Pair<String, DataOperation> pair = tasks.take(); // 执行任务,更新服务列表 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }}
DistroConsistencyServiceImpl#Notifier#handle方法,如下:
private void handle(Pair<String, DataOperation> pair) { try { // 获取 serviceId String datumKey = pair.getValue0(); // 事件类型,是 CHANGE 类型 DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { // 【重点】这里的 listener 就是 service,当服务变更时,自然就触发了 onChange 事件,处理变更 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}
c、覆盖实例列表listener.onChange(datumKey, dataStore.get(datumKey).value);
Service#onChange方法,如下:
@Overridepublic void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); // 对权重做初始化 for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } // 更新实例列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum();}
Service#updateIPs方法,如下:
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { // 创建新的 map,相当于一个新的 clusterMap Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } // 把所有实例放入新的 clusterMap for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine // 遍历新的 clusterMap,得到 cluster 中的实例列表 List<Instance> entryIPs = entry.getValue(); // 【重点】把新实例列表,更新到注册表中的 cluster 中 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}
Cluster#updateIps方法,如下:
public void updateIps(List<Instance> ips, boolean ephemeral) { // 先得到旧的实例列表 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } // ips 中包含两部分:新增的实例,要更新的实例 // 新旧实例列表交集,得到要更新的部分 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { // 将实例的 health 保持为 oldInstance 的 health ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } // 新旧实例列表相减,得到待新增的实例列表 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } // 旧新实例列表相减,得到待删除的实例列表(即旧实例列表有,而新实例列表没有,需删除) List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); // 用新实例列表直接覆盖了 cluster 中的旧实例列表 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; }}
【B、集群数据同步】
DistroConsistencyServiceImpl#sync方法,如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) { // 遍历,获取 Nacos 集群中的所有成员,除了自己 for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); // 定义一个Distro的同步任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); // 交给线程池去执行 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } }}
上述代码中同步的任务封装为一个DistroDelayTask对象,交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()执行,其返回值为NacosDelayTaskExecuteEngine,这个类维护了一个线程池,并且接收任务,执行任务。
getDelayTaskExecuteEngine.png
DistroDelayTaskExecuteEngine.png
NacosDelayTaskExecuteEngine.png
NacosDelayTaskExecuteEngine#processTasks方法,如下:
protected void processTasks() { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed // 尝试执行同步任务,如果失败会重试 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } }}
可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。
3、服务端注册的流程图
服务端注册的流程图.png
四、Nacos服务注册部分总结
1、Nacos的注册表结构
Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。
因此其注册表结构为一个Map,类型是:
① Map<String, Map<String, Service>>:外层key是namespace_id,内层key是group+serviceName;② Service内部维护一个Map,结构是:Map<String,Cluster>,key是clusterName,值是集群信息;③ Cluster内部维护一个Set集合Set<Instance> ephemeralInstances和Set<Instance> persistentInstances,元素是Instance类型,代表集群中的多个实例。
2、Nacos保证并发写的安全性
① 在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响;相同service时通过锁来互斥。② 在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1。
3、Nacos避免并发读写的冲突
Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。
4、Nacos应对内部数十万服务的并发写请求
Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。
五、结尾
以上即为Nacos源码分析-服务注册的全部内容,感谢阅读。 |