nacos解析-详解配置自动刷新原理
本文基于nacos-2.0.3版本
当客户端设置autoRefreshed = true
时,比如:
@NacosValue(value = "${XXX:XX}", autoRefreshed = true)
或者
@NacosPropertySource(dataId = "XXX", autoRefreshed = true)
服务端配置值发生变化,客户端的属性值也会跟着发生变化。这是如何做到的?本文将首先介绍@NacosPropertySource的原理,之后介绍@NacosValue。
1、@NacosPropertySource自动刷新原理
在@NacosPropertySource的自动刷新中,ClientWorker类起着非常关键的作用,其作用如下:
- 当设置
autoRefreshed = true
,ClientWorker提供了cacheMap在本地缓存这些需要自动刷新的配置数据; - 提供了从服务端和本地获取配置数据的方法,并提供一个定时任务,定时从服务端拉取配置数据。
先来看一下ClientWorker的构造方法:
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
//代码删减
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
ClientWorker的构造方法创建了一个定时任务(每10ms运行一次),定时任务每次调用checkConfigInfo()方法:
public void checkConfigInfo() {
// Dispatch taskes.
//cacheMap里面存放的是CacheData对象,
//当配置需要自动刷新时,会在cacheMap里面增加一条记录
//cacheMap的key由groupId和dataId组成,value是CacheData
int listenerSize = cacheMap.size();
// Round up the longingTaskCount.
//将需要刷新的数据分组,每3000个为一组,一组由一个线程处理
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
需要刷新的数据分好组之后,交给LongPollingRunnable类处理。
在介绍LongPollingRunnable之前,我们先回过头看一下NacosPropertySourcePostProcessor后处理器,该处理器专门处理注解@NacosPropertySource,该类提供了一个处理autoRefreshed的方法:
public static void addListenerIfAutoRefreshed(
final NacosPropertySource nacosPropertySource, final Properties properties,
final ConfigurableEnvironment environment) {
//如果设置不自动刷新,直接返回
if (!nacosPropertySource.isAutoRefreshed()) { // Disable Auto-Refreshed
return;
}
//代码删减
try {
ConfigService configService = nacosServiceFactory
.createConfigService(properties);
//创建监听器
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String config) {
String name = nacosPropertySource.getName();
NacosPropertySource newNacosPropertySource = new NacosPropertySource(
dataId, groupId, name, config, type);
newNacosPropertySource.copy(nacosPropertySource);
MutablePropertySources propertySources = environment
.getPropertySources();
propertySources.replace(name, newNacosPropertySource);
}
};
//添加监听器
if (configService instanceof EventPublishingConfigService) {
((EventPublishingConfigService) configService).addListener(dataId,
groupId, type, listener);
}
else {
configService.addListener(dataId, groupId, listener);
}
}
//代码删减
}
addListenerIfAutoRefreshed()方法的最后调用了configService.addListener()方法,而configService.addListener()方法最终又会调用ClientWorker.addTenantListeners():
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
//创建CacheData
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
//将监听器添加到CacheData中,当数据发生变化时,CacheData会通知这些监听器
cache.addListener(listener);
}
}
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
String key = GroupKey.getKeyTenant(dataId, group, tenant);
CacheData cacheData = cacheMap.get(key);
if (cacheData != null) {
return cacheData;
}
cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
//cacheMap的key是由dataId, group, tenant三个参数组成的
CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
if (lastCacheData == null) {
if (enableRemoteSyncConfig) {
//访问服务端,从服务端拉取dataId, group对应的配置数据
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
cacheData.setContent(ct[0]);//配置内容保存到CacheData中
}
//对当前的CacheData对象分组
int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
//代码删除
return lastCacheData;
}
通过addCacheDataIfAbsent()方法可以清晰的看到CacheData如何被创建以及保存了哪些数据,而且CacheData的分组规则与LongPollingRunnable的分组规则一样。
下面继续分析LongPollingRunnable类。LongPollingRunnable实现了Runnable接口,下面我们重点分析其run()方法。
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
//代码删减
//从服务器上批量拉取本组内的配置发生变化的groupId和dataId
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//遍历发生变化的配置
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//从服务器拉取发生变化的配置数据
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);//更新
if (null != ct[1]) {
cache.setType(ct[1]);
}
} catch (NacosException ioe) {
//代码删减
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//通知CacheData的监听器
//监听器的作用有:
//1、更改对象的属性值
//2、替换spring容器的Environment里面的PropertySource
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);//直接开始运行下次任务
} catch (Throwable e) {
//代码删减
}
}
从上面的介绍可以看到,nacos配置自动更新依靠的是两个定时任务,第一个定时任务是检查是否有新的需要自动刷新的配置;第二个定时任务是不断访问服务端检查是否有新的配置更新。
2、@NacosValue自动刷新原理
在NacosValue中也有一个autoRefreshed = true
的配置,这个配置起什么作用,它和NacosPropertySource之间是什么关系?要回答这些问题,先看一下nacos如何处理该注解。
nacos提供了NacosValueAnnotationBeanPostProcessor后处理器处理注解NacosValue,并且提供了doWithAnnotation()方法处理autoRefreshed ,下面看一下该方法源码:
private void doWithAnnotation(String beanName, Object bean, NacosValue annotation,
int modifiers, Method method, Field field) {
if (annotation != null) {
if (Modifier.isStatic(modifiers)) {
return;
}
//判断是否是自动刷新
if (annotation.autoRefreshed()) {
String placeholder = resolvePlaceholder(annotation.value());
if (placeholder == null) {
return;
}
NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName,
method, field, annotation.value());
//如果属性需自动刷新,那么将配置名字和nacosValueTarget放到placeholderNacosValueTargetMap中,
//placeholderNacosValueTargetMap是Map类型,其key为String,value为List<NacosValueTarget>
put2ListMap(placeholderNacosValueTargetMap, placeholder,
nacosValueTarget);
}
}
}
doWithAnnotation()将需要刷新的对象和属性放到了一个Map中。
如果我们看一下NacosValueAnnotationBeanPostProcessor处理器的定义,会发现该类实现了ApplicationListener<NacosConfigReceivedEvent>,这说明该处理器还监听了NacosConfigReceivedEvent事件,而从服务器拉取了更新的配置数据后,通知CacheData的监听器时也会发布NacosConfigReceivedEvent,所以当服务器有更新的配置时,就会通知NacosValueAnnotationBeanPostProcessor。下面在来看一下该类的onApplicationEvent方法:
public void onApplicationEvent(NacosConfigReceivedEvent event) {
//遍历需要自动刷新的属性
for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
.entrySet()) {
String key = environment.resolvePlaceholders(entry.getKey());
String newValue = environment.getProperty(key);
if (newValue == null) {
continue;
}
List<NacosValueTarget> beanPropertyList = entry.getValue();
for (NacosValueTarget target : beanPropertyList) {
String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
target.updateLastMD5(md5String);
Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
if (target.method == null) {
setField(target, evaluatedValue);//更新属性值
}
else {
setMethod(target, evaluatedValue);//调用方法更新
}
}
}
}
}
在onApplicationEvent()方法中,可以清晰的看到刷新配置的逻辑。
3、总结
通过上面的介绍,可以发现自动刷新是NacosValue和NacosPropertySource两个共同作用的结果,配置NacosPropertySource自动刷新,可以定时从服务器拉取groupId和dataId对应的配置内容,配置NacosValue自动刷新,可以更新指定对象的属性。
更多推荐
所有评论(0)