一.启动nacos源码中的 nacos-excample项目

1.添加JVM启动参数或者 直接在main方法中硬编码指定nacos-server的ip和端口

Properties properties = new Properties();
properties.setProperty("serverAddr", "127.0.0.1:8848");
properties.setProperty("namespace", "public");

 

    @Override
    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {

        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setWeight(1.0);
        instance.setClusterName(clusterName);
        //服务名 
        //组名 默认为 DEFAULT_GROUP
        //实例对象 
        registerInstance(serviceName, groupName, instance);
    }

 

2.启动nacos源码中的 nacos-excample项目

【Nacos】Nacos源码分析(二):服务端和客户端实例注册

二.客户端实例注册源码分析

0.以这行代码为入口 ,跟踪源码

naming.registerInstance("nacos.test.3", "11.11.11.11", 8888);

1.首先将客户端信息,如ip,端口,权重,集群名称等等,封装为实例(Instance)对象,调用registerInstance方法

2.此时判断改实例是否是临时节点,如果是临时节点,则需要初始化心跳相关的信息,例如心跳间隔等等

备注:

Nacos 在 1.0.0版本 instance级别增加了一个ephemeral字段,该字段表示注册的实例是否是临时实例还是持久化实例。

临时实例:则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。

持久化实例:则会持久化被 Nacos 服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。

同一个服务下可以同时有临时实例和持久化实例,这意味着当这服务的所有实例进程不在时,会有部分实例从服务上摘除,剩下的实例则会保留在服务下。

使用实例的ephemeral来判断,ephemeral为true对应的是服务健康检查模式中的 client 模式,为false对应的是 server 模式。

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //判断实例是否是临时节点,默认是true
        if (instance.isEphemeral()) {
            //初始化心跳相关的信息
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

心跳信息:

    /**
     * Build new beat information.
     *
     * @param groupedServiceName service name with group name, format: ${groupName}@@${serviceName}
     * @param instance instance
     * @return new beat information
     */
    public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
        BeatInfo beatInfo = new BeatInfo();
        //服务名
        beatInfo.setServiceName(groupedServiceName);
        //ip
        beatInfo.setIp(instance.getIp());
        //端口
        beatInfo.setPort(instance.getPort());
        //集群名称
        beatInfo.setCluster(instance.getClusterName());
        //权重
        beatInfo.setWeight(instance.getWeight());
        //元数据信息
        beatInfo.setMetadata(instance.getMetadata());
        //是否可调度
        beatInfo.setScheduled(false);
        //心跳间隔 默认5s
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        return beatInfo;
    }

添加心跳信息:将该实例的心跳任务添加到线程池中,这里看下心跳任务的源码 BeatTask

    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                //http 请求nacos 服务端 /instance/beat接口
                //请求  参数1:心跳信息;参数2:是否是轻量级心跳(首次注册这里为false);
                //备注: 这里false(nacos客户端心跳分为两种,一种是注册时心跳,目的是首次注册是上报,第2种是轻量级心跳,目的是保持链接)
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asInt();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                //首次注册时心跳,服务端会返回参数lightBeatEnabled=true,标注下次心跳为轻量级心跳
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                //实例未找到,则重新构造实例,发送心跳
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

            }
            //构造下一次心跳任务
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

 

好了们这里回到主线,也是客户端注册实例最后一步  serverProxy.registerService(groupedServiceName, groupName, instance);

    /**
     * register a instance to service with specified instance properties.
     *
     * @param serviceName name of service
     * @param groupName   group of service
     * @param instance    instance to register
     * @throws NacosException nacos exception
     */
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);

        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        //请求服务端 v1/ns/instance 接口注册实例
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

    }

 

三.服务端实例注册源码分析

1.首先看服务端实例注册的入口也就是 v1/ns/instance 接口 

源码位于nacos-naming项目中

【Nacos】Nacos源码分析(二):服务端和客户端实例注册

/**
 * Register new instance.
 *
 * @param request http request
 * @return 'ok' if success
 * @throws Exception any error during register
 */
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    
    final Instance instance = parseInstance(request);
    
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

2.服务端注册实例  serviceManager.registerInstance(namespaceId, serviceName, instance) 

    /**
     * Register an instance to a service in AP mode.
     *
     * <p>This method creates service or cluster silently if they don't exist.
     *
     * @param namespaceId id of namespace
     * @param serviceName service name
     * @param instance    instance to register
     * @throws Exception any error occurred in the process
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建一个空服务,目的是如果实例第一次注册,则需要初始化心跳和其他相关的服务参数
        //备注:服务端是通过服务管理多个实例,服务与实例之间的关系为:服务:实例=1:N
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //从缓存Map中获取服务名
        Service service = getService(namespaceId, serviceName);
        //服务不存在,则抛出异常
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //添加实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

3.接下来看下创建空服务的方法  createEmptyService()

 

跟踪源码到  createServiceIfAbsent方法

    /**
     * Create service if not exist.
     *
     * @param namespaceId namespace
     * @param serviceName service name
     * @param local       whether create service by local
     * @param cluster     cluster
     * @throws NacosException nacos exception
     */
    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        //从缓存Map中获取服务
        Service service = getService(namespaceId, serviceName);
        //服务不存在,通常是在首次注册时,则需要初始化服务
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            //服务名
            service.setName(serviceName);
            //名称空间:namespaceId
            service.setNamespaceId(namespaceId);
            //组名: 例如DEFAULT_GROUP
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            //最近一次修改服务的时间
            service.setLastModifiedMillis(System.currentTimeMillis());
            //计算服务的MD5值
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            //添加服务,并且初始化:这里分为两个方法,分别是 添加服务 和 初始化
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

4.接下来进入 putServiceAndInit 方法

    private void putServiceAndInit(Service service) throws NacosException {
        //添加服务
        putService(service);
        //服务的初始化
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

添加服务:putService(service);

    /**
     * Put service into manager.
     *
     * @param service service
     */
    public void putService(Service service) {
        //这里使用了serviceMap,这是个双层Map 去缓存服务信息
        //private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
        //Map(namespace, Map(group::serviceName, Service)).
        //外层Map key:名称空间,value:里Map
        //里层Map key:组名+服务名,value:对应的服务
        
        //首先会在缓存Map中招是否有 名称空间Id,如果没有,则初始化
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                }
            }
        }
        //如果有则直接获取名称空间 对应的Map,将服务添加进去
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

初始化服务  service.init();

    /**
     * Init service.
     */
    public void init() {
        //创建客户端健康检查任务
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

这里进入到 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

这里可以看到健康检查任务也是使用了schedule线程池去初始化一个延迟任务任务,初始化延迟5s,延迟5s执行一次

    /**
     * Schedule client beat check task with a delay.
     *
     * @param task client beat check task
     */
    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

线程任务 ClientBeatCheckTask 源码,这里主要看两段代码

-----判断实例是否健康

           //获取服务的所有实例
            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {
                //判断当前时间-实例最近一次心跳上报的时间 > 实例的心跳间隔,如果满足则表示,实例不健康
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    //判断实例是否被标记 默认是false
                    if (!instance.isMarked()) {
                        //判断实例是否健康 默认true
                        if (instance.isHealthy()) {
                            //设置实例为不健康
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //获取推送服务,发布服务变更的消息,底层使用了updpush
                            getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(
new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }

-----删除超时下线的实例

           // 删除已经下线的服务
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }
                //当前时间- 实例最近一次心跳的时间 > 超时剔除的时间
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }

5.接下来重回主线  putServiceAndInit

       //这里是数据一致性保证 这里暂时不做展开
//Distro :CP 模型,最终一致性;
//Raft: AP 模型, 强一致性;
consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

6.回到主线 添加实例 registerInstance()--> addInstance()

/**
     * Add instance to service.
     *
     * @param namespaceId namespace
     * @param serviceName service name
     * @param ephemeral   whether instance is ephemeral
     * @param ips         instances
     * @throws NacosException nacos exception
     */
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //放进一致性服务里。这里根据key来选择是临时性的还是永久性,如果是临时节点(实例),则使用distro协议进行数据一致性保障
            consistencyService.put(key, instances);
        }
    }

7.这里看下 consistencyService.put(key, instances) ,基于临时节点的distro协议  ap模型保证

    //临时节点,使用distro数据一致性保障
    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        // 临时一致性协议的同步数据。这里同步数据是异步任务执行的,添加到阻塞队列
        // 也就是说先返回客户端put成功再同步,弱一致性。 AP模型
        taskDispatcher.addTask(key);
    }

首先是onPut()方法

public void onPut(String key, Record value) {
        //判断是否是临时节点
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }
        //判断当前key是否被监听
        //putServiceAndInit():
        //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        //consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        if (!listeners.containsKey(key)) {
            return;
        }

        //有监听立即通知服务有改变,将任务放入阻塞队列中,然后通过while循环发布监听事件
      notifier.addTask(key, ApplyAction.CHANGE); }

循环处理阻塞队列中的事件

DistroConsistencyServiceImpl
  @PostConstruct
    public void init() {
        GlobalExecutor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    Loggers.DISTRO.error("load data failed.", e);
                }
            }
        });

        executor.submit(notifier);
        GlobalExecutor.submit(loadDataTask);
    }

       @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            while (true) {
                try {

                    Pair pair = tasks.take();

                    if (pair == null) {
                        continue;
                    }

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {
                        continue;
                    }

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

 

最后返回OK

return "ok";

 

分类:

技术点:

相关文章: