1. 概述

  1. 客户端会发起一个Http 请求到 Config Service 的 notifications/v2 接口,也就是NotificationControllerV2  。
  2. NotificationControllerV2 不会立即返回结果,而是通过 Spring DeferredResult 把请求挂起。
  3. 如果在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端。
  4. 如果有该客户端关心的配置发布,NotificationControllerV2 会调用 DeferredResult 的 setResult 方法,传入有配置变化的 namespace 信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置。

 

2. NotificationControllerV2

1.实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2 接口。

2.1 类的继承结构

 

apollo源码(3)-ConfigService通知配置变化

2.2 客户端请求流程

 

apollo源码(3)-ConfigService通知配置变化

2.3 pollNotification方法

1.客户端请求配置变化入口

@RequestMapping(method = RequestMethod.GET)
 public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
 @RequestParam(value = "appId") String appId,
 @RequestParam(value = "cluster") String cluster,
 @RequestParam(value = "notifications") String notificationsAsString,
 @RequestParam(value = "dataCenter", required = false) String dataCenter,
 @RequestParam(value = "ip", required = false) String clientIp) {
 // 解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组。
 List<ApolloConfigNotification> notifications = null;
 try {
 notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);
 } catch (Throwable ex) {
 Tracer.logError(ex);
 }
 if (CollectionUtils.isEmpty(notifications)) {
 throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
 }

 // 创建 DeferredResultWrapper 对象
 DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
 // Namespace 集合
 Set<String> namespaces = Sets.newHashSet();
 // 客户端的通知 Map 。key 为 Namespace 名,value 为通知编号。
 Map<String, Long> clientSideNotifications = Maps.newHashMap();
 // 过滤并创建 ApolloConfigNotification Map
 Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
 // 循环 ApolloConfigNotification Map ,初始化上述变量。
 for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
 String normalizedNamespace = notificationEntry.getKey();
 ApolloConfigNotification notification = notificationEntry.getValue();
 // 添加到 `namespaces` 中。
 namespaces.add(normalizedNamespace);
 // 添加到 `clientSideNotifications` 中。
 clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
 // 记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。
 if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
 deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
 }
 }
 if (CollectionUtils.isEmpty(namespaces)) {
 throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
 }

 // 组装 Watch Key Multimap
 Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
 // 生成 Watch Key 集合
 Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
 // 获得 Watch Key 集合中,每个 Watch Key 对应的 ReleaseMessage 记录。
 List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
......
 // 获得新的 ApolloConfigNotification 通知数组
 List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);
 // 若有新的通知,直接设置结果。
 if (!CollectionUtils.isEmpty(newNotifications)) {
 deferredResultWrapper.setResult(newNotifications);
 // 若无新的通知,
 } else {
 // 注册超时事件
 deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); // 【TODO 6001】Tracer 日志
 // 注册结束事件
 deferredResultWrapper.onCompletion(() -> {
 // 移除 Watch Key + DeferredResultWrapper 出 `deferredResults`
 // unregister all keys
 for (String key : watchedKeys) {
 deferredResults.remove(key, deferredResultWrapper);
 }
 // 【TODO 6001】Tracer 日志
 logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
 });

 // 注册 Watch Key + DeferredResultWrapper 到 `deferredResults` 中,等待配置发生变化后通知。详见 `#handleMessage(...)` 方法。
 // register all keys
 for (String key : watchedKeys) {
 this.deferredResults.put(key, deferredResultWrapper);
 }
.....
 return deferredResultWrapper.getResult();
 }


1.解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组

2.创建 DeferredResultWrapper 对象

3.创建 Namespace 的名字的集合

4.创建客户端的通知信息 Map 。其中,KEY 为 Namespace 的名字,VALUE 为通知编号

5.调用 #filterNotifications(appId, notifications) 方法,过滤并创建 ApolloConfigNotification Map

6.循环 ApolloConfigNotification Map ,初始化上述变量。1.添加到 namespaces 中。2.添加到 clientSideNotifications 中。.

7.调用 WatchKeysUtil#assembleAllWatchKeys(appId, cluster, namespaces, dataCenter)方法,组装 Watch Key Multimap 

8.生成 Watch Key 集合

9.调用 ReleaseMessageServiceWithCache#findLatestReleaseMessagesGroupByMessages(watchedKeys)方法,获得 Watch Key 集合中,每个 Watch Key 对应的最新的 ReleaseMessage 记录

10.调用 EntityManagerUtil#closeEntityManager() 方法,手动关闭 EntityManager

11.调用 getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages)方法,获得新的 ApolloConfigNotification 通知数组

12.若有新的通知,调用 DeferredResultWrapper#setResult(List<ApolloConfigNotification>) 方法,直接设置 DeferredResult 的结果,从而结束长轮询

13.若无新的通知,注册到 deferredResults 中,等到有配置变更或超时。

 

2.4 getApolloConfigNotifications方法

1.getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages)方法,获得新的 ApolloConfigNotification 通知数组。代码如下:

 private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
  Map<String, Long> clientSideNotifications,
  Multimap<String, String> watchedKeysMap,
 List<ReleaseMessage> latestReleaseMessages) {
 // 创建 ApolloConfigNotification 数组
  List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
  if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
  // 创建最新通知的 Map 。其中 Key 为 Watch Key 。
 Map<String, Long> latestNotifications = Maps.newHashMap();
 for (ReleaseMessage releaseMessage : latestReleaseMessages) {
 latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
 }
 // 循环 Namespace 的名字的集合,判断是否有配置更新
 for (String namespace : namespaces) {
 long clientSideId = clientSideNotifications.get(namespace);
 long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
 // 获得 Namespace 对应的 Watch Key 集合
 Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
 // 获得最大的通知编号
 for (String namespaceWatchedKey : namespaceWatchedKeys) {
 long namespaceNotificationId = latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
 if (namespaceNotificationId > latestId) {
 latestId = namespaceNotificationId;
 }
 }
 // 若服务器的通知编号大于客户端的通知编号,意味着有配置更新
 if (latestId > clientSideId) {
 // 创建 ApolloConfigNotification 对象
 ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
 // 循环添加通知编号到 ApolloConfigNotification 中。
 namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
 notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
 // 添加 ApolloConfigNotification 对象到结果
 newNotifications.add(notification);
 }
}
 }
 return newNotifications;
 }

1.创建新的 ApolloConfigNotification 数组

2.创建最新通知的 Map 。其中,KEY 为 Watch Key 

3.循环 Namespace 的名字的集合,根据 latestNotifications 判断是否有配置更新

4.获得 Namespace 对应的 Watch Key 集合。获得最大的通知编号。若服务器的通知编号大于客户端的通知编号,意味着有配置更新。

5.返回 newNotifications

2.5 handleMessage

1.handleMessage(ReleaseMessage, channel) 方法,当有新的 ReleaseMessage 时,通知其对应的 Namespace 的,并且正在等待的请求

 1: @Override
 2: public void handleMessage(ReleaseMessage message, String channel) {
 3:     logger.info("message received - channel: {}, message: {}", channel, message);
 4:     // 【TODO 6001】Tracer 日志
 5:     String content = message.getMessage();
 6:     Tracer.logEvent("Apollo.LongPoll.Messages", content);
 7:
 8:     // 仅处理 APOLLO_RELEASE_TOPIC
 9:     if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
10:         return;
11:     }
12:
13:     // 获得对应的 Namespace 的名字
14:     String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
15:     if (Strings.isNullOrEmpty(changedNamespace)) {
16:         logger.error("message format invalid - {}", content);
17:         return;
18:     }
19:
20:     // `deferredResults` 存在对应的 Watch Key
21:     if (!deferredResults.containsKey(content)) {
22:         return;
23:     }
24:
25:     // create a new list to avoid ConcurrentModificationException
26:     // 创建 DeferredResultWrapper 数组,避免并发问题。
27:     List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
28:
29:     // 创建 ApolloConfigNotification 对象
30:     ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
31:     configNotification.addMessage(content, message.getId());
32:
33:     // do async notification if too many clients
34:     // 若需要通知的客户端过多,使用 ExecutorService 异步通知,避免“惊群效应”
35:     if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
36:         largeNotificationBatchExecutorService.submit(() -> {
37:             logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
38:                     bizConfig.releaseMessageNotificationBatch());
39:             for (int i = 0; i < results.size(); i++) {
40:                 // 每 N 个客户端,sleep 一段时间。
41:                 if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
42:                     try {
43:                         TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
44:                     } catch (InterruptedException e) {
45:                         //ignore
46:                     }
47:                 }
48:                 logger.debug("Async notify {}", results.get(i));
49:                 // 设置结果
50:                 results.get(i).setResult(configNotification);
51:             }
52:         });
53:         return;
54:     }
55:
56:     logger.debug("Notify {} clients for key {}", results.size(), content);
57:     // 设置结果
58:     for (DeferredResultWrapper result : results) {
59:         result.setResult(configNotification);
60:     }
61:     logger.debug("Notification completed");
62: }

 

1.第 8 至 11 行:仅处理 APOLLO_RELEASE_TOPIC

2. 第 13 至 18 行:获得对应的 Namespace 的名字。

3. 第 20 至 23 行:deferredResults 存在对应的 Watch Key

4. 从 deferredResults 中读取并创建 DeferredResultWrapper 数组,避免并发问题

5.创建 ApolloConfigNotification 对象,并调用 ApolloConfigNotification#addMessage(String key, long notificationId)方法,添加通知消息明细

6.第 57 至 60 行:循环调用 DeferredResultWrapper#setResult(List<ApolloConfigNotification>) 方法,设置 DeferredResult 的结果,从而结束长轮询

3. ApolloConfigNotification类

1.Apollo 配置通知 DTO 。代码如下:

public class ApolloConfigNotification {
 
 /**
 * Namespace 名字
 */
 private String namespaceName;
 /**
 * 最新通知编号
 *
 * 目前使用 `ReleaseMessage.id` 。
 */
 private long notificationId;
 /**
 * 通知消息集合
 */
 private volatile ApolloNotificationMessages messages;
 
 public ApolloConfigNotification(String namespaceName, long notificationId) {
 this.namespaceName = namespaceName;
 this.notificationId = notificationId;
 }
}

1.namespaceName 字段,Namespace 名,指向对应的 Namespace 。因此,一个 Namespace 对应一个 ApolloConfigNotification 对象

2.notificationId 字段,最新通知编号,目前使用 ReleaseMessage.id 字段

3.messages字段,通知消息集合

3.1 ApolloNotificationMessages类

1.Apollo 配置通知消息集合 DTO 

public class ApolloNotificationMessages {
    /**
     * 明细 Map
     *
     * KEY :{appId} "+" {clusterName} "+" {namespace} ,例如:100004458+default+application
     * VALUE :通知编号
     */
    private Map<String, Long> details;

    public ApolloNotificationMessages() {
        this(Maps.<String, Long>newHashMap());
    }
}

 

1.当namespaceName 对应的 Namespace 是关联类型时,会同时查询当前 Namespace + 关联的 Namespace 这两个 Namespace,所以会是多个,使用 Map 数据结构

4. DeferredResultWrapper类

1.DeferredResult 包装器,封装 DeferredResult 的公用方法

4.1构造方法

/**
 * 默认超时时间
 */
private static final long TIMEOUT = 60 * 1000; //60 seconds
/**
 * 未修改时的 ResponseEntity 响应,使用 302 状态码。
 */
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

/**
 * 归一化和原始的 Namespace 的名字的 Map
 */
private Map<String, String> normalizedNamespaceNameToOriginalNamespaceName;
/**
 * 响应的 DeferredResult 对象
 */
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

public DeferredResultWrapper() {
    result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}

1.TIMEOUT 静态属性,默认超时时间

2.NOT_MODIFIED_RESPONSE_LIST 静态属性,未修改时的 ResponseEntity 响应,使用 302 状态码

3.normalizedNamespaceNameToOriginalNamespaceName 属性,归一化( normalized )和原始( original )的 Namespace 的名字的 Map 。因为客户端在填写 Namespace 时,写错了名字的大小写。在 Config Service 中,会进行归一化“修复”,方便逻辑的统一编写。但是,最终返回给客户端需要“还原”回原始( original )的 Namespace 的名字,避免客户端无法识别

 

4.2 setResult方法

public void setResult(ApolloConfigNotification notification) {
    setResult(Lists.newArrayList(notification));
}

public void setResult(List<ApolloConfigNotification> notifications) {
    // 恢复被归一化的 Namespace 的名字为原始的 Namespace 的名字
    if (normalizedNamespaceNameToOriginalNamespaceName != null) {
        notifications.stream().filter(notification -> normalizedNamespaceNameToOriginalNamespaceName.containsKey
                (notification.getNamespaceName())).forEach(notification -> notification.setNamespaceName(
                normalizedNamespaceNameToOriginalNamespaceName.get(notification.getNamespaceName())));
    }
    // 设置结果,并使用 200 状态码。
    result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
}

1.将配置变化的情况通知给客户端

 

至此客户端配置通知变化的分析结束

 

相关文章:

  • 2021-08-28
  • 2021-07-10
  • 2021-05-22
  • 2022-12-23
  • 2022-12-23
  • 2021-12-02
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-07-05
  • 2022-02-23
  • 2021-04-03
  • 2021-09-25
  • 2022-02-01
  • 2021-05-05
  • 2022-12-23
相关资源
相似解决方案