欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

12.nacos服务注册源码分析之nacos-server服务注册

时间:2023-07-10
nacos服务注册源码分析之nacos-server服务注册 主要内容

接上篇博文,本篇博文主要介绍nacos-server服务端的处理,包括一下内容:

nacos-server 如何存储服务信息nacos-server 接收 nacos-client 注册请求的处理,服务实例如何保存nacos-server 接收 nacos-client 心跳请求的处理 服务信息存储结构


存储结构核心代码

// ## ServiceManager public class ServiceManager implements RecordListener { private final Map> serviceMap = new ConcurrentHashMap<>();}// ## Service : 服务里面存储 clusterMap public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener {// clusterName -> Cluster private Map clusterMap = new HashMap<>();}// ## Cluster public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {// 存储持久节点 private Set persistentInstances = new HashSet<>(); // 存储临时节点 private Set ephemeralInstances = new HashSet<>();}

服务端处理服务注册 InstanceController.register 接收到的服务注册信息 服务注册

// serviceManager.registerInstance(namespaceId, serviceName, instance);// 注册一个AP模式的服务// 如果服务或者集群不存在,则创建public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 创建Service: namespaceId=public; serviceName=DEFAULT_GROUP@@system createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加服务实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }

创建 Service

// createEmptyService(namespaceId, serviceName, instance.isEphemeral());// namespaceId=public; serviceName=DEFAULT_GROUP@@system; cluster=nullpublic void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { // 第一次注册为空,则创建Service对象 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service、if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } }}// 保存并初始化Service// putServiceAndInit(service);private void putServiceAndInit(Service service) throws NacosException { // 将服务放到serviceMap中,见下 putService(service); // 服务初始化:启动客户端心跳检测任务 service.init(); // consistencyService 是 DelegateConsistencyServiceImpl // 分布式一致性服务,注册监听,一个持久的,一个临时的 // 持久监听key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); // 临时监听key: com.alibaba.nacos.naming.iplist.public##DEFAULT_GROUP@@system consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}public void putService(Service service) {// doubleCheck 方式,将Service放入到 serviceMap 中 if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } // serviceMap: namespaceId -> Map(serviceName -> service ) serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}

添加服务实例

// 添加服务实例到Service中 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance..、ips) throws NacosException { // key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { // 构造实例列表 List instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 保存到 consistencyService 中 consistencyService.put(key, instances); } }

服务端接收客户端心跳上报 InstanceController.beat Controller 接收心跳请求

// 这里比较简单了@CanDistro@PutMapping("/beat")@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); // clientBeatInterval: 5000 result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); // beat = ""; String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } // DEFAULT String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); // 192.168.31.30 String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); // 8081 int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); // false if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } // public String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // DEFAULT_GROUP@@system String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 获取实例信息 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果实例信息为空,则重新注册到注册中心 if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setmetadata(clientBeat.getmetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // ### 正式处理心跳,见下 service.processClientBeat(clientBeat); // 处理返回信息 result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsmetadata(PreservedmetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result;}

心跳处理 service.processClientBeat(clientBeat)

// processClientBeat 构造一个ClientBeatProcessor 来处理心跳请求public void processClientBeat(final RsInfo rsInfo) { // 客户端心跳处理器 ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); // 执行处理器 HealthCheckReactor.scheduleNow(clientBeatProcessor); }// ClientBeatProcessor.run@Overridepublic void run() { Service service = this.service; String ip = rsInfo.getIp(); // 192.168.31.30 String clusterName = rsInfo.getCluster(); // DEFAULT int port = rsInfo.getPort(); // 8081 // 获取服务下的集群信息 Cluster cluster = service.getClusterMap().get(clusterName); // 获取集群下的所有实例 List instances = cluster.allIPs(true); for (Instance instance : instances) { // 遍历实例 // 如果是上报心跳的实例 if (instance.getIp().equals(ip) && instance.getPort() == port) { // 更新最后心跳上送时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { // 如果实例不健康了,则更新健康状况 instance.setHealthy(true); // 暴露服务信息变更事件 getPushService().serviceChanged(service); } } } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。