博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apollo 源码解析 —— Config Service 配置读取接口
阅读量:7118 次
发布时间:2019-06-28

本文共 25193 字,大约阅读时间需要 83 分钟。

hot3.png

1. 概述

本文接  一文,分享 Config Service 配置读取的接口的实现。在上文,我们看到通知变化接口返回通知相关的信息,而不包括配置相关的信息。所以 Config Service 需要提供配置读取的接口

? 为什么不在通知变化的同时,返回最新的配置信息呢?(哈哈哈)

OK,让我们开始看看具体的代码实现。

2. ConfigController

com.ctrip.framework.apollo.configservice.controller.ConfigController ,配置 Controller ,提供 configs/{appId}/{clusterName}/{namespace:.+} 接口,提供配置读取的功能。

2.1 构造方法

private static final Splitter X_FORWARDED_FOR_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults();private static final Type configurationTypeReference = new TypeToken
>() {}.getType();@Autowiredprivate ConfigService configService;@Autowiredprivate AppNamespaceServiceWithCache appNamespaceService;@Autowiredprivate NamespaceUtil namespaceUtil;@Autowiredprivate InstanceConfigAuditUtil instanceConfigAuditUtil;@Autowiredprivate Gson gson

2.2 queryConfig

1: @RequestMapping(value = "/{appId}/{clusterName}/{namespace:.+}", method = RequestMethod.GET) 2: public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName, 3:                                 @PathVariable String namespace, 4:                                 @RequestParam(value = "dataCenter", required = false) String dataCenter, 5:                                 @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey, 6:                                 @RequestParam(value = "ip", required = false) String clientIp, 7:                                 @RequestParam(value = "messages", required = false) String messagesAsString, 8:                                 HttpServletRequest request, HttpServletResponse response) throws IOException { 9:     String originalNamespace = namespace;10:     // 若 Namespace 名以 .properties 结尾,移除该结尾,并设置到 ApolloConfigNotification 中。例如 application.properties => application 。11:     // strip out .properties suffix12:     namespace = namespaceUtil.filterNamespaceName(namespace);13:     // 获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。14:     //fix the character case issue, such as FX.apollo <-> fx.apollo15:     namespace = namespaceUtil.normalizeNamespace(appId, namespace);16: 17:     // 若 clientIp 未提交,从 Request 中获取。18:     if (Strings.isNullOrEmpty(clientIp)) {19:         clientIp = tryToGetClientIp(request);20:     }21: 22:     // 解析 messagesAsString 参数,创建 ApolloNotificationMessages 对象。23:     ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);24: 25:     // 创建 Release 数组26:     List
releases = Lists.newLinkedList();27: // 获得 Namespace 对应的 Release 对象28: String appClusterNameLoaded = clusterName;29: if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {30: // 获得 Release 对象31: Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace, dataCenter, clientMessages);32: if (currentAppRelease != null) {33: // 添加到 Release 数组中。34: releases.add(currentAppRelease);35: // 获得 Release 对应的 Cluster 名字36: // we have cluster search process, so the cluster name might be overridden37: appClusterNameLoaded = currentAppRelease.getClusterName();38: }39: }40: // 若 Namespace 为关联类型,则获取关联的 Namespace 的 Release 对象41: // if namespace does not belong to this appId, should check if there is a public configuration42: if (!namespaceBelongsToAppId(appId, namespace)) {43: // 获得 Release 对象44: Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace, dataCenter, clientMessages);45: // 添加到 Release 数组中46: if (!Objects.isNull(publicRelease)) {47: releases.add(publicRelease);48: }49: }50: // 若获得不到 Release ,返回状态码为 404 的响应51: if (releases.isEmpty()) {52: response.sendError(HttpServletResponse.SC_NOT_FOUND, String.format("Could not load configurations with appId: %s, clusterName: %s, namespace: %s",53: appId, clusterName, originalNamespace));54: Tracer.logEvent("Apollo.Config.NotFound", assembleKey(appId, clusterName, originalNamespace, dataCenter));55: return null;56: }57: 58: // 记录 InstanceConfig59: auditReleases(appId, clusterName, dataCenter, clientIp, releases);60: 61: // 计算 Config Service 的合并 ReleaseKey62: String mergedReleaseKey = releases.stream().map(Release::getReleaseKey).collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));63: // 对比 Client 的合并 Release Key 。若相等,说明没有改变,返回状态码为 302 的响应64: if (mergedReleaseKey.equals(clientSideReleaseKey)) {65: // Client side configuration is the same with server side, return 30466: response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);67: Tracer.logEvent("Apollo.Config.NotModified", assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));68: return null;69: }70: 71: // 创建 ApolloConfig 对象72: ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace, mergedReleaseKey);73: // 合并 Release 的配置,并将结果设置到 ApolloConfig 中74: apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));75: 76: // 【TODO 6001】Tracer 日志77: Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));78: return apolloConfig;79: }
  • GET /configs/{appId}/{clusterName}/{namespace:.+} 接口指定 Namespace 的配置读取。在  中,有该接口的接口定义说明。
  • clientSideReleaseKey 请求参数,客户端侧的 Release Key ,用于和获得的 Release 的 releaseKey 对比,判断是否有配置更新。
  • clientIp 请求参数,客户端 IP ,用于灰度发布的功能。? 本文会跳过和灰度发布相关的内容,后续文章单独分享。
  • messagesAsString 请求参数,客户端当前请求的 Namespace 的通知消息明细,在【第 23 行】中,调用 #transformMessages(messagesAsString) 方法,解析 messagesAsString 参数,创建 ApolloNotificationMessages 对象。在  中,我们已经看到通知变更接口返回的就包括 ApolloNotificationMessages 对象。#transformMessages(messagesAsString) 方法,代码如下:

    ApolloNotificationMessages transformMessages(String messagesAsString) {    ApolloNotificationMessages notificationMessages = null;    if (!Strings.isNullOrEmpty(messagesAsString)) {        try {            notificationMessages = gson.fromJson(messagesAsString, ApolloNotificationMessages.class);        } catch (Throwable ex) {            Tracer.logError(ex);        }    }    return notificationMessages;}
  • 第 12 行:调用 NamespaceUtil#filterNamespaceName(namespaceName) 方法,若 Namespace 名以 ".properties" 结尾,移除该结尾。

  • 第 15 行:调用 NamespaceUtil#normalizeNamespace(appId, originalNamespace) 方法,获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。
  • 第 17 至 20 行:若客户端未提交 clientIp ,调用 #tryToGetClientIp(HttpServletRequest) 方法,获取 IP 。详细解析,见  方法。
  • ========== 分割线 ==========
  • 第 26 行:创建 Release 数组。
  • 第 27 至 39 行:获得 Namespace 对应的最新的 Release 对象。
    • 第 31 行:调用 ConfigService#loadConfig(appId, clientIp, appId, clusterName, namespace, dataCenter, clientMessages) 方法,获得 Release 对象。详细解析,见  方法。
    • 第 34 行:添加到 Release 书中。
    • 第 37 行:获得 Release 对应的 Cluster 名字。因为,在 ConfigService#loadConfig(appId, clientIp, appId, clusterName, namespace, dataCenter, clientMessages) 方法中,会根据 clusterName 和 dataCenter 分别查询 Release 直到找到一个,所以需要根据结果的 Release 获取真正的 Cluster 名
  • 第 40 至 49 行:若 Namespace 为关联类型,则获取关联的 Namespace 的最新的 Release 对象。

    • 第 42 行:调用 #namespaceBelongsToAppId(appId, namespace) 方法,判断 Namespace 非当前 App 下的,这是关联类型的前提。代码如下:

      private boolean namespaceBelongsToAppId(String appId, String namespaceName) {    // Namespace 非 'application' ,因为每个 App 都有    // Every app has an 'application' namespace    if (Objects.equals(ConfigConsts.NAMESPACE_APPLICATION, namespaceName)) {        return true;    }    // App 编号非空    // if no appId is present, then no other namespace belongs to it    if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {        return false;    }    // 非当前 App 下的 Namespace    AppNamespace appNamespace = appNamespaceService.findByAppIdAndNamespace(appId, namespaceName);    return appNamespace != null;}
      • x
    • 第 44 行:调用 #findPublicConfig(...) 方法,获得公用类型的 Namespace 的 Release 对象。代码如下:

      private Release findPublicConfig(String clientAppId, String clientIp, String clusterName,                                 String namespace, String dataCenter, ApolloNotificationMessages clientMessages) {    // 获得公用类型的 AppNamespace 对象    AppNamespace appNamespace = appNamespaceService.findPublicNamespaceByName(namespace);    // 判断非当前 App 下的,那么就是关联类型。    // check whether the namespace's appId equals to current one    if (Objects.isNull(appNamespace) || Objects.equals(clientAppId, appNamespace.getAppId())) {        return null;    }    String publicConfigAppId = appNamespace.getAppId();    // 获得 Namespace 最新的 Release 对象    return configService.loadConfig(clientAppId, clientIp, publicConfigAppId, clusterName, namespace, dataCenter, clientMessages);}
      • 在其内部,也是调用 ConfigService#loadConfig(appId, clientIp, appId, clusterName, namespace, dataCenter, clientMessages)方法,获得 Namespace 最新的 Release 对象。
    • 第 45 至 48 行:添加到 Release 数组中。
  • 第 50 至 56 行:若获得不到 Release ,返回状态码为 404 的响应。
  • ========== 分割线 ==========
  • 第 59 行:调用 #auditReleases(...) 方法,记录 InstanceConfig 。详细解析,见  。
  • ========== 分割线 ==========
  • 第 62 行:计算 Config Service 的合并 ReleaseKey 。当有多个 Release 时,使用 "+" 作为字符串的分隔
  • 第 64 至 69 行:对比 Client 的合并 Release Key 。若相等,说明配置没有改变,返回状态码为 302 的响应。
  • ========== 分割线 ==========
  • 第 72 行:创建 ApolloConfig 对象。详细解析,见  方法。
  • 第 74 行:调用 #mergeReleaseConfigurations(List<Release) 方法,合并多个 Release 的配置集合,并将结果设置到 ApolloConfig 中。详细解析,见  方法。
  • 第 77 行:【TODO 6001】Tracer 日志
  • 第 78 行:返回 ApolloConfig 对象。

2.3 tryToGetClientIp

#tryToGetClientIp(HttpServletRequest) 方法,从请求中获取 IP 。代码如下:

private String tryToGetClientIp(HttpServletRequest request) {    String forwardedFor = request.getHeader("X-FORWARDED-FOR");    if (!Strings.isNullOrEmpty(forwardedFor)) {        return X_FORWARDED_FOR_SPLITTER.splitToList(forwardedFor).get(0);    }    return request.getRemoteAddr();}
  • 关于 "X-FORWARDED-FOR" Header ,详细解析见  。

2.4 mergeReleaseConfigurations

#mergeReleaseConfigurations(List<Release) 方法,合并多个 Release 的配置集合。代码如下:

Map
mergeReleaseConfigurations(List
releases) { Map
result = Maps.newHashMap(); // 反转 Release 数组,循环添加到 Map 中。 for (Release release : Lists.reverse(releases)) { result.putAll(gson.fromJson(release.getConfigurations(), configurationTypeReference)); } return result;}
  • 为什么要反转数组?因为关联类型的 Release 添加到 Release 数组中。但是,App 下 的 Release 的优先级更高,所以进行反转。

3. ConfigService

com.ctrip.framework.apollo.configservice.service.config.ConfigService ,实现 ReleaseMessageListener 接口,配置 Service 接口。代码如下:

public interface ConfigService extends ReleaseMessageListener {        Release loadConfig(String clientAppId, String clientIp, String configAppId, String            configClusterName, String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages);}

子类如下图所示:类图

最终有两个子类,差异点在于是否使用缓存,通过 ServerConfig "config-service.cache.enabled" 配置,默认关闭。开启后能提高性能,但是会增大内存消耗!

在 ConfigServiceAutoConfiguration 中,初始化使用的 ConfigService 实现类,代码如下:

@Autowiredprivate BizConfig bizConfig;@Beanpublic ConfigService configService() {    // 开启缓存,使用 ConfigServiceWithCache    if (bizConfig.isConfigServiceCacheEnabled()) {        return new ConfigServiceWithCache();    }    // 不开启缓存,使用 DefaultConfigService    return new DefaultConfigService();}

3.1 AbstractConfigService

com.ctrip.framework.apollo.configservice.service.config.AbstractConfigService ,实现 ConfigService 接口,配置 Service 抽象类,实现公用的获取配置的逻辑,并暴露抽象方法,让子类实现。抽象方法如下:

protected abstract Release findActiveOne(long id, ApolloNotificationMessages clientMessages);protected abstract Release findLatestActiveRelease(String configAppId, String configClusterName,                                                   String configNamespaceName, ApolloNotificationMessages clientMessages);

3.1.1 loadConfig

#loadConfig(...) 实现方法,代码如下:

1: @Override 2: public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName, 3:                           String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) { 4:     // 优先,获得指定 Cluster 的 Release 。若存在,直接返回。 5:     // load from specified cluster fist 6:     if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) { 7:         Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace, 8:                 clientMessages); 9:         if (!Objects.isNull(clusterRelease)) {10:             return clusterRelease;11:         }12:     }13: 14:     // 其次,获得所属 IDC 的 Cluster 的 Release 。若存在,直接返回15:     // try to load via data center16:     if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {17:         Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace, clientMessages);18:         if (!Objects.isNull(dataCenterRelease)) {19:             return dataCenterRelease;20:         }21:     }22: 23:     // 最后,获得默认 Cluster 的 Release 。24:     // fallback to default release25:     return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace, clientMessages);26: }
  • 第 4 至 12 行:优先,获得指定 Cluster 的 Release 。若存在,直接返回。
  • 第 14 至 21 行:其次,获得所属 IDC 的 Cluster 的 Release 。若存在,直接返回。
  • 第 25 行:最后,获得默认的 Cluster 的 Release 。
  • 每一次获取,都调用了 #findRelease(...) 方法,获取对应的 Release 对象。详细解析,见  方法。
  • 关于多 Cluster 的读取顺序,可参见  。这块的代码,就是实现该顺序,如下图:读取顺序

3.1.2 findRelease

1: 14: private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,15:                             String configNamespace, ApolloNotificationMessages clientMessages) {16:     // 读取灰度发布编号17:     Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId, configClusterName, configNamespace);18:     //  读取灰度 Release 对象19:     Release release = null;20:     if (grayReleaseId != null) {21:         release = findActiveOne(grayReleaseId, clientMessages);22:     }23:     // 非灰度,获得最新的,并且有效的 Release 对象24:     if (release == null) {25:         release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);26:     }27:     return release;28: }
  • 第 17 行:调用 GrayReleaseRulesHolder#findReleaseIdFromGrayReleaseRule(...) 方法,读取灰度发布编号,即 GrayReleaseRule.releaseId 属性。详细解析,在  中。
  • 第 18 至 22 行:调用 #findActiveOne(grayReleaseId, clientMessages) 方法,读取灰度 Release 对象。
  • 第 23 至 26 行:若非灰度,调用 #findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages) 方法,获得最新的,并且有效的 Release 对象。

3.3 DefaultConfigService

com.ctrip.framework.apollo.configservice.service.config.DefaultConfigService ,实现 AbstractConfigService 抽象类,配置 Service 默认实现类,直接查询数据库,而不使用缓存。代码如下:

public class DefaultConfigService extends AbstractConfigService {    @Autowired    private ReleaseService releaseService;    @Override    protected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {        return releaseService.findActiveOne(id);    }    @Override    protected Release findLatestActiveRelease(String configAppId, String configClusterName, String configNamespace,                                              ApolloNotificationMessages clientMessages) {        return releaseService.findLatestActiveRelease(configAppId, configClusterName, configNamespace);    }    @Override    public void handleMessage(ReleaseMessage message, String channel) {        // since there is no cache, so do nothing    }}
  • ReleaseService ,在  中,有详细解析。

3.4 ConfigServiceWithCache

com.ctrip.framework.apollo.configservice.service.config.ConfigServiceWithCache ,实现 AbstractConfigService 抽象类,基于 Guava Cache 的配置 Service 实现类。

3.4.1 构造方法

private static final Logger logger = LoggerFactory.getLogger(ConfigServiceWithCache.class);private static final long DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES = 60; //1 hour// TRACER 日志内存的枚举private static final String TRACER_EVENT_CACHE_INVALIDATE = "ConfigCache.Invalidate";private static final String TRACER_EVENT_CACHE_LOAD = "ConfigCache.LoadFromDB";private static final String TRACER_EVENT_CACHE_LOAD_ID = "ConfigCache.LoadFromDBById";private static final String TRACER_EVENT_CACHE_GET = "ConfigCache.Get";private static final String TRACER_EVENT_CACHE_GET_ID = "ConfigCache.GetById";private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();@Autowiredprivate ReleaseService releaseService;@Autowiredprivate ReleaseMessageService releaseMessageService;private LoadingCache
configCache;private LoadingCache
> configIdCache;private ConfigCacheEntry nullConfigCacheEntry;public ConfigServiceWithCache() { nullConfigCacheEntry = new ConfigCacheEntry(ConfigConsts.NOTIFICATION_ID_PLACEHOLDER, null);}

3.4.2 ConfigCacheEntry

ConfigCacheEntry ,ConfigServiceWithCache 的内部私有静态类,配置缓存 Entry 。代码如下:

private static class ConfigCacheEntry {        private final long notificationId;        private final Release release;    public ConfigCacheEntry(long notificationId, Release release) {        this.notificationId = notificationId;        this.release = release;    }    public long getNotificationId() {        return notificationId;    }    public Release getRelease() {        return release;    }}

3.4.3 初始化

#initialize() 方法,通过 Spring 调用,初始化缓存对象。代码如下:

1: @PostConstruct 2: void initialize() { 3:     // 初始化 configCache 4:     configCache = CacheBuilder.newBuilder() 5:             .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES) // 访问过期 6:             .build(new CacheLoader
() { 7: @Override 8: public ConfigCacheEntry load(String key) { 9: // 格式不正确,返回 nullConfigCacheEntry10: List
namespaceInfo = STRING_SPLITTER.splitToList(key);11: if (namespaceInfo.size() != 3) {12: Tracer.logError(new IllegalArgumentException(String.format("Invalid cache load key %s", key)));13: return nullConfigCacheEntry;14: }15: // 【TODO 6001】Tracer 日志16: Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD, key);17: try {18: // 获得最新的 ReleaseMessage 对象19: ReleaseMessage latestReleaseMessage = releaseMessageService.findLatestReleaseMessageForMessages(Lists.newArrayList(key));20: // 获得最新的,并且有效的 Release 对象21: Release latestRelease = releaseService.findLatestActiveRelease(namespaceInfo.get(0), namespaceInfo.get(1), namespaceInfo.get(2));22: // 【TODO 6001】Tracer 日志23: transaction.setStatus(Transaction.SUCCESS);24: // 获得通知编号25: long notificationId = latestReleaseMessage == null ? ConfigConsts.NOTIFICATION_ID_PLACEHOLDER : latestReleaseMessage.getId();26: // 若 latestReleaseMessage 和 latestRelease 都为空,返回 nullConfigCacheEntry27: if (notificationId == ConfigConsts.NOTIFICATION_ID_PLACEHOLDER && latestRelease == null) {28: return nullConfigCacheEntry;29: }30: // 创建 ConfigCacheEntry 对象31: return new ConfigCacheEntry(notificationId, latestRelease);32: } catch (Throwable ex) {33: // 【TODO 6001】Tracer 日志34: transaction.setStatus(ex);35: throw ex;36: } finally {37: // 【TODO 6001】Tracer 日志38: transaction.complete();39: }40: }41: });42: // 初始化 configIdCache43: configIdCache = CacheBuilder.newBuilder()44: .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES) // 访问过期45: .build(new CacheLoader
>() {46: @Override47: public Optional
load(Long key) {48: // 【TODO 6001】Tracer 日志49: Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD_ID, String.valueOf(key));50: try {51: // 获得 Release 对象52: Release release = releaseService.findActiveOne(key);53: // 【TODO 6001】Tracer 日志54: transaction.setStatus(Transaction.SUCCESS);55: // 使用 Optional 包装 Release 对象返回56: return Optional.ofNullable(release);57: } catch (Throwable ex) {58: // 【TODO 6001】Tracer 日志59: transaction.setStatus(ex);60: throw ex;61: } finally {62: // 【TODO 6001】Tracer 日志63: transaction.complete();64: }65: }66: });67: }
  • 第 4 至 41 行:初始化 configCache 。
    • 第 9 至 14 行: key 格式不正确,返回 nullConfigCacheEntry 。
    • 第 19 行:调用 releaseMessageService.findLatestReleaseMessageForMessages(List<String>) 方法,获得最新的ReleaseMessage 对象。这一步是 DefaultConfigService 没有的操作,用于读取缓存的时候,判断缓存是否过期,下文详细解析。
    • 第 21 行:调用 ReleaseService.findLatestActiveRelease(appId, clusterName, namespaceName) 方法,获得最新的,且有效的 Release 对象。
    • 第 25 行:获得通知编号。
    • 第 26 至 29 行:若 latestReleaseMessage 和 latestRelease 为空,返回 nullConfigCacheEntry 。
    • 第 31 行:创建 ConfigCacheEntry 对象,并返回。
  • 第 42 至 66 行:初始化 configIdCache 。
    • 第 52 行:调用 ReleaseService#findActiveOne(key) 方法,获得 Release 对象。
    • 第 56 行:调用 Optional.ofNullable(Object) 方法,使用 Optional 包装 Release 对象,并返回。

3.4.4 handleMessage

1: @Override 2: public void handleMessage(ReleaseMessage message, String channel) { 3:     logger.info("message received - channel: {}, message: {}", channel, message); 4:     // 仅处理 APOLLO_RELEASE_TOPIC 5:     if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) { 6:         return; 7:     } 8:     try { 9:         // 清空对应的缓存10:         invalidate(message.getMessage());11:         // 预热缓存,读取 ConfigCacheEntry 对象,重新从 DB 中加载。12:         // warm up the cache13:         configCache.getUnchecked(message.getMessage());14:     } catch (Throwable ex) {15:         //ignore16:     }17: }
  • 第 4 至 7 行:仅处理 APOLLO_RELEASE_TOPIC 。
  • 第 10 行:调用 #invalidate(message) 方法,清空对应的缓存。代码如下:

    private void invalidate(String key) {    // 清空对应的缓存    configCache.invalidate(key);    // 【TODO 6001】Tracer 日志    Tracer.logEvent(TRACER_EVENT_CACHE_INVALIDATE, key);}
  • 第 13 行:调用 LoadingCache#getUnchecked(key) 方法,预热缓存,读取 ConfigCacheEntry 对象,重新从 DB 中加载。

3.4.5 findLatestActiveRelease

1: @Override 2: protected Release findLatestActiveRelease(String appId, String clusterName, String namespaceName, ApolloNotificationMessages clientMessages) { 3:     // 根据 appId + clusterName + namespaceName ,获得 ReleaseMessage 的 `message` 。 4:     String key = ReleaseMessageKeyGenerator.generate(appId, clusterName, namespaceName); 5:     // 【TODO 6001】Tracer 日志 6:     Tracer.logEvent(TRACER_EVENT_CACHE_GET, key); 7:     // 从缓存 configCache 中,读取 ConfigCacheEntry 对象 8:     ConfigCacheEntry cacheEntry = configCache.getUnchecked(key); 9:     // 若客户端的通知编号更大,说明缓存已经过期。10:     // cache is out-dated11:     if (clientMessages != null && clientMessages.has(key) && clientMessages.get(key) > cacheEntry.getNotificationId()) {12:         // 清空对应的缓存13:         // invalidate the cache and try to load from db again14:         invalidate(key);15:         // 读取 ConfigCacheEntry 对象,重新从 DB 中加载。16:         cacheEntry = configCache.getUnchecked(key);17:     }18:     // 返回 Release 对象19:     return cacheEntry.getRelease();20: }
  • 第 4 行:调用 ReleaseMessageKeyGenerator#generate(appId, clusterName, namespaceName) 方法,根据 appId + clusterName + namespaceName ,获得 ReleaseMessage 的 message 。
  • 第 8 行:调用 LoadingCache#getUnchecked(key) 方法,从缓存 configCache 中,读取 ConfigCacheEntry 对象。
  • 第 9 至 17 行:若客户端的通知编号更大,说明缓存已经过期。因为 #handleMessage(ReleaseMessage message, String channel) 方法,是通过定时扫描 ReleaseMessage 的机制实现,那么延迟是不可避免会存在的。所以通过此处比较的方式,实现缓存的过期的检查
    • 第 14 行:调用 #invalidate(message) 方法,清空对应的缓存。
    • 第 16 行:调用 LoadingCache#getUnchecked(key) 方法,读取 ConfigCacheEntry 对象,重新从 DB 中加载。
      • 第 19 行:返回 Release 对象。

3.4.6 findActiveOne

@Overrideprotected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {    // 【TODO 6001】Tracer 日志    Tracer.logEvent(TRACER_EVENT_CACHE_GET_ID, String.valueOf(id));    // 从缓存 configIdCache 中,读取 Release 对象    return configIdCache.getUnchecked(id).orElse(null);}

4. ApolloConfig

com.ctrip.framework.apollo.core.dto.ApolloConfig ,Apollo 配置 DTO 。代码如下:

public class ApolloConfig {        private String appId;        private String cluster;        private String namespaceName;        private Map
configurations; private String releaseKey;}
  • 该类在 apollo-core 项目中,被 apollo-configservice 和 apollo-client 共同引用。因此,Apollo 的客户端,也使用 ApolloConfig 。
  • 工作一到五年的java 开发工程师朋友可以加入我们Java架构交流:760940986
    领取获取往期Java高级架构资料、源码、笔记、视频。Dubbo、Redis、设计模式、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术

转载于:https://my.oschina.net/u/3906190/blog/2985140

你可能感兴趣的文章
IT题库1-HashMap、HashSet和HashTable(区别?数据格式?)
查看>>
34. Search for a Range
查看>>
BZOJ 1355 KMP中next数组的应用
查看>>
【BZOJ3098】 Hash Killer II
查看>>
·DIV CSS3处理元素重叠
查看>>
OpenGL图形编程实现2D变换
查看>>
不做7件事,提高生产力
查看>>
zabbix3.0.4监控mysql主从同步
查看>>
表的操作
查看>>
C#操作Word的+ CKEditor 輸出成Word文件(包含圖案上傳)
查看>>
排序算法
查看>>
java多线程之Future和FutureTask
查看>>
并发--模块整理
查看>>
WCF 第五章 可信赖会话
查看>>
RuntimeError: Object: Could not open SDE workspace
查看>>
我的Android进阶之旅------>Android中android:visibility 属性VISIBLE、INVISIBLE、GONE的区别...
查看>>
安装Zookeeper和kafka,安装完毕后,遇到的错误
查看>>
c++构造函数成员初始化中赋值和初始化列表两种方式的区别
查看>>
[转载]项目风险管理七种武器-拳头
查看>>
SQL基础知识点汇总
查看>>