本文基于nacos-2.0.3版本

当客户端设置autoRefreshed = true时,比如:

@NacosValue(value = "${XXX:XX}", autoRefreshed = true)
或者
@NacosPropertySource(dataId = "XXX", autoRefreshed = true)

服务端配置值发生变化,客户端的属性值也会跟着发生变化。这是如何做到的?本文将首先介绍@NacosPropertySource的原理,之后介绍@NacosValue。

1、@NacosPropertySource自动刷新原理

在@NacosPropertySource的自动刷新中,ClientWorker类起着非常关键的作用,其作用如下:

  1. 当设置autoRefreshed = true,ClientWorker提供了cacheMap在本地缓存这些需要自动刷新的配置数据;
  2. 提供了从服务端和本地获取配置数据的方法,并提供一个定时任务,定时从服务端拉取配置数据。

先来看一下ClientWorker的构造方法:

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        //代码删减
        
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

ClientWorker的构造方法创建了一个定时任务(每10ms运行一次),定时任务每次调用checkConfigInfo()方法:

    public void checkConfigInfo() {
        // Dispatch taskes.
        //cacheMap里面存放的是CacheData对象,
        //当配置需要自动刷新时,会在cacheMap里面增加一条记录
        //cacheMap的key由groupId和dataId组成,value是CacheData
        int listenerSize = cacheMap.size();
        // Round up the longingTaskCount.
        //将需要刷新的数据分组,每3000个为一组,一组由一个线程处理
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // The task list is no order.So it maybe has issues when changing.
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

需要刷新的数据分好组之后,交给LongPollingRunnable类处理。
在介绍LongPollingRunnable之前,我们先回过头看一下NacosPropertySourcePostProcessor后处理器,该处理器专门处理注解@NacosPropertySource,该类提供了一个处理autoRefreshed的方法:

	public static void addListenerIfAutoRefreshed(
			final NacosPropertySource nacosPropertySource, final Properties properties,
			final ConfigurableEnvironment environment) {
		//如果设置不自动刷新,直接返回
		if (!nacosPropertySource.isAutoRefreshed()) { // Disable Auto-Refreshed
			return;
		}

		//代码删减
		
		try {

			ConfigService configService = nacosServiceFactory
					.createConfigService(properties);
			//创建监听器
			Listener listener = new AbstractListener() {

				@Override
				public void receiveConfigInfo(String config) {
					String name = nacosPropertySource.getName();
					NacosPropertySource newNacosPropertySource = new NacosPropertySource(
							dataId, groupId, name, config, type);
					newNacosPropertySource.copy(nacosPropertySource);
					MutablePropertySources propertySources = environment
							.getPropertySources();
					propertySources.replace(name, newNacosPropertySource);
				}
			};
			//添加监听器
			if (configService instanceof EventPublishingConfigService) {
				((EventPublishingConfigService) configService).addListener(dataId,
						groupId, type, listener);
			}
			else {
				configService.addListener(dataId, groupId, listener);
			}

		}
		//代码删减
	}

addListenerIfAutoRefreshed()方法的最后调用了configService.addListener()方法,而configService.addListener()方法最终又会调用ClientWorker.addTenantListeners():

    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
            throws NacosException {
        group = null2defaultGroup(group);
        String tenant = agent.getTenant();
        //创建CacheData
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        for (Listener listener : listeners) {
        	//将监听器添加到CacheData中,当数据发生变化时,CacheData会通知这些监听器
            cache.addListener(listener);
        }
    }
    public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        CacheData cacheData = cacheMap.get(key);
        if (cacheData != null) {
            return cacheData;
        }
    
        cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
        //cacheMap的key是由dataId, group, tenant三个参数组成的
        CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
        if (lastCacheData == null) {
            if (enableRemoteSyncConfig) {
            	//访问服务端,从服务端拉取dataId, group对应的配置数据
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                cacheData.setContent(ct[0]);//配置内容保存到CacheData中
            }
            //对当前的CacheData对象分组
            int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
            cacheData.setTaskId(taskId);
            lastCacheData = cacheData;
        }
        //代码删除
        return lastCacheData;
    }

通过addCacheDataIfAbsent()方法可以清晰的看到CacheData如何被创建以及保存了哪些数据,而且CacheData的分组规则与LongPollingRunnable的分组规则一样。
下面继续分析LongPollingRunnable类。LongPollingRunnable实现了Runnable接口,下面我们重点分析其run()方法。

  public void run() {
      List<CacheData> cacheDatas = new ArrayList<CacheData>();
      List<String> inInitializingCacheList = new ArrayList<String>();
      try {
          //代码删减
          
          //从服务器上批量拉取本组内的配置发生变化的groupId和dataId
          List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
         //遍历发生变化的配置
          for (String groupKey : changedGroupKeys) {
              String[] key = GroupKey.parseKey(groupKey);
              String dataId = key[0];
              String group = key[1];
              String tenant = null;
              if (key.length == 3) {
                  tenant = key[2];
              }
              try {
              		//从服务器拉取发生变化的配置数据
                  String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                  CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                  cache.setContent(ct[0]);//更新
                  if (null != ct[1]) {
                      cache.setType(ct[1]);
                  }
                 
              } catch (NacosException ioe) {
                  //代码删减
              }
          }
          for (CacheData cacheData : cacheDatas) {
              if (!cacheData.isInitializing() || inInitializingCacheList
                      .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                  //通知CacheData的监听器
                  //监听器的作用有:
                  //1、更改对象的属性值
                  //2、替换spring容器的Environment里面的PropertySource
                  cacheData.checkListenerMd5();
                  cacheData.setInitializing(false);
              }
          }
          inInitializingCacheList.clear();
          executorService.execute(this);//直接开始运行下次任务
      } catch (Throwable e) {
          //代码删减
      }
  }

从上面的介绍可以看到,nacos配置自动更新依靠的是两个定时任务,第一个定时任务是检查是否有新的需要自动刷新的配置;第二个定时任务是不断访问服务端检查是否有新的配置更新。

2、@NacosValue自动刷新原理

在NacosValue中也有一个autoRefreshed = true的配置,这个配置起什么作用,它和NacosPropertySource之间是什么关系?要回答这些问题,先看一下nacos如何处理该注解。
nacos提供了NacosValueAnnotationBeanPostProcessor后处理器处理注解NacosValue,并且提供了doWithAnnotation()方法处理autoRefreshed ,下面看一下该方法源码:

	private void doWithAnnotation(String beanName, Object bean, NacosValue annotation,
			int modifiers, Method method, Field field) {
		if (annotation != null) {
			if (Modifier.isStatic(modifiers)) {
				return;
			}
			//判断是否是自动刷新
			if (annotation.autoRefreshed()) {
				String placeholder = resolvePlaceholder(annotation.value());

				if (placeholder == null) {
					return;
				}

				NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName,
						method, field, annotation.value());
				//如果属性需自动刷新,那么将配置名字和nacosValueTarget放到placeholderNacosValueTargetMap中,
				//placeholderNacosValueTargetMap是Map类型,其key为String,value为List<NacosValueTarget>
				put2ListMap(placeholderNacosValueTargetMap, placeholder,
						nacosValueTarget);
			}
		}
	}

doWithAnnotation()将需要刷新的对象和属性放到了一个Map中。
如果我们看一下NacosValueAnnotationBeanPostProcessor处理器的定义,会发现该类实现了ApplicationListener<NacosConfigReceivedEvent>,这说明该处理器还监听了NacosConfigReceivedEvent事件,而从服务器拉取了更新的配置数据后,通知CacheData的监听器时也会发布NacosConfigReceivedEvent,所以当服务器有更新的配置时,就会通知NacosValueAnnotationBeanPostProcessor。下面在来看一下该类的onApplicationEvent方法:

	public void onApplicationEvent(NacosConfigReceivedEvent event) {
		//遍历需要自动刷新的属性
		for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
				.entrySet()) {
			String key = environment.resolvePlaceholders(entry.getKey());
			String newValue = environment.getProperty(key);

			if (newValue == null) {
				continue;
			}
			List<NacosValueTarget> beanPropertyList = entry.getValue();
			for (NacosValueTarget target : beanPropertyList) {
				String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
				boolean isUpdate = !target.lastMD5.equals(md5String);
				if (isUpdate) {
					target.updateLastMD5(md5String);
					Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
					if (target.method == null) {
						setField(target, evaluatedValue);//更新属性值
					}
					else {
						setMethod(target, evaluatedValue);//调用方法更新
					}
				}
			}
		}
	}

在onApplicationEvent()方法中,可以清晰的看到刷新配置的逻辑。

3、总结

通过上面的介绍,可以发现自动刷新是NacosValue和NacosPropertySource两个共同作用的结果,配置NacosPropertySource自动刷新,可以定时从服务器拉取groupId和dataId对应的配置内容,配置NacosValue自动刷新,可以更新指定对象的属性。

GitHub 加速计划 / na / nacos
29.83 K
12.75 K
下载
Nacos是由阿里巴巴开源的服务治理中间件,集成了动态服务发现、配置管理和服务元数据管理功能,广泛应用于微服务架构中,简化服务治理过程。
最近提交(Master分支:3 个月前 )
4334cd16 * Support custom client configuration timeout.(#12748) * Add UT.(#12748) 20 天前
b04d2266 24 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐