Nacos2.X 源码分析:为订阅方推送、服务健康检查、集群数据同步、grpc客户端服务端初始化
建议先看下面这篇文章,再看本文
Nacos2.X 服务注册、服务发现详解
为订阅者进行推送
从上面可知,我们在进行服务注册时最后会发布一个ServiceChangedEvent
事件。在服务发现时,进行订阅服务的最后会发布一个ServiceSubscribedEvent
事件。
我们能大致猜测:当一个服务的实例进行了更新,那么是不是应该推送给所有订阅了该服务的客户端?当我订阅了某个服务,是不是应该将该服务信息推送给我?
我们接下来就来看看这两个事件是如何进行处理的。
服务注册与服务订阅流程中发布的事件最终会被NamingSubscriberServiceV2Impl.onEvent()
中进行处理:
- 如果是服务改变,那么就要推送给所有的订阅者
- 如果是服务订阅者事件,那么就只给这一个订阅者进行推送即可
- 最终都会调用到
NacosDelayTaskExecuteEngine
类的addTask()
方法
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// 如果是服务改变,那么就要推送给所有的订阅者,所以下面创建PushDelayTask对象的构造方法中只有两个参数
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
// 创建PushDelayTask对象的构造方法中只有两个参数
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// 如果是服务订阅者事件,那么就只给这一个订阅者进行推送即可,所以下面创建PushDelayTask对象的构造方法中只有三个参数
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
// 创建PushDelayTask对象的构造方法中只有三个参数,最后一个参数是订阅者的ClientId
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
// 我们来看看PushDelayTask对象的两个构造方法
public class PushDelayTask extends AbstractDelayTask {
...
public PushDelayTask(Service service, long delay) {
this.service = service;
// 推送给所有订阅者标识为true,推送给某个目标订阅者为null
pushToAll = true;
targetClients = null;
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}
public PushDelayTask(Service service, long delay, String targetClient) {
this.service = service;
// 推送给所有订阅者标识为false,推送给某个目标订阅者为构造方法传过来的ClientId
this.pushToAll = false;
this.targetClients = new HashSet<>(1);
this.targetClients.add(targetClient);
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}
}
我们现在再来看看NacosDelayTaskExecuteEngine
类的addTask()
方法中是做了什么:
- 构造方法中创建一个定时任务,定期的去执行
processTasks()
方法 addTask()
方法就把我们上方PushDelayTask
存入tasks
这个集合中。注意PushDelayTask
它是AbstractDelayTask
的子类processTasks()
方法中会从tasks
集合中取出所有的AbstractDelayTask
,循环处理,通过AbstractDelayTask
找各自对应的处理类去进行处理
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
......
// 构造方法中会执行一个定时任务
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 该类初始化时会执行一个定时任务:ProcessRunnable,在该类的run()方法中会调用下面processTasks()方法
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
......
// addTask()方法中仅仅是把我们传递过来的AbstractDelayTask对象存入一个Map集合中
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
// 这里会把AbstractDelayTask存进去
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
// 这里会每个一段时间定时执行,从集合中取出所有的AbstractDelayTask去进行处理
protected void processTasks() {
// 这里会把AbstractDelayTask全取出来进行处理
Collection<Object> keys = getAllTaskKeys();
// 遍历
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// 根据不同不同类型的AbstractDelayTask,找对应的处理类
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 再调用各自对应的处理类的process()方法
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error ", e);
retryFailedTask(taskKey, task);
}
}
}
......
}
因为我们这里的是AbstractDelayTask
的子类PushDelayTask
类,它对应的处理类是PushDelayTaskProcessor
。如下所示,就会进入到这里的process()
- 该类会创建PushExecuteTask任务并执行
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
...
@Override
public boolean process(NacosTask task) {
// 强转,转换为最初的PushDelayTask类型
PushDelayTask pushDelayTask = (PushDelayTask) task;
// 取出service对象
Service service = pushDelayTask.getService();
// 从下面的方法名我们就可以看出来这又是异步执行一个task任务,所以我们重点关注new PushExecuteTask()这个类的run()方法
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
PushExecuteTask类的run()方法
- 根据
delayTask.isPushToAll()
来判断是否要推送当前服务的所有订阅者,返回所有订阅者或某一个订阅者 - 推送NacosClient
public class PushExecuteTask extends AbstractExecuteTask {
private final Service service;
private final PushDelayTaskExecuteEngine delayTaskEngine;
private final PushDelayTask delayTask;
public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, PushDelayTask delayTask) {
this.service = service;
this.delayTaskEngine = delayTaskEngine;
this.delayTask = delayTask;
}
@Override
public void run() {
// 将封装了service信息的wrapper推送给客户端
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
// 根据 delayTask.isPushToAll() 来判断是否要推送当前服务的所有订阅者,返回所有订阅者或某一个订阅者
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
// 发送grpc请求 推送给客户端
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
}
客户端的处理
public class NamingPushRequestHandler implements ServerRequestHandler {
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
// 更新客户端服务实例本地缓存serviceInfoMap集合 processServiceInfo()方法在服务发现流程中经常出现
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
}
服务检查检查
先回忆一下Nacos1.X 的实现方式:
- NacosClient进行服务注册时,如果是临时实例,会开启一个定时任务定期发送心跳
- 向NacosServer端发送post请求进行服务注册
- NacosServer端判断service是不是需要新创建,在新创建的流程中会调用service.init()方法
- 在init()方法中实现的该服务下所有实例的健康检查定时任务,进行修改不健康实例、剔除过期实例
而Nacos2.X 从http请求改为了grpc请求的方式了,在健康检查这方面就是基于长连接来实现的。客户端发送心跳是在构建grpc连接时会有一个线程每隔5s发送一次,NacosClient端发送心跳的代码在RpcClient.start()
方法中进行的
NacosServer端健康检查的入口在ConnectionManager.start()
中
我们接下来就详细看看这个健康检查run()关键实现:
- 找超时的ConnectionId添加进行outDatedConnections集合中
- 探活机制,遍历outDatedConnections集合,对各个connection发送async请求,将正常响应的ConnectionId添加进successConnections集合中
- 在outDatedConnections集合中,且不在successConnections集合中的超时连接id就会去调用unregister()进行注销
- 将连接中connections集合中移除,将clientId从clients集合中删除
- 发布事件
- 处理事件中会删除注册表和订阅表中的信息、数据同步至其他节点
public void run() {
try {
int totalCount = connections.size();
Loggers.REMOTE_DIGEST.info("Connection check task start");
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
// 所有的connection对象
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
...
//2.get expel connection for ip limit.
// 遍历所有的connection连接对象集合
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
if (integer != null && integer.intValue() > 0) {
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
// 当前时间 - client的lastActiveTime时间 >= 20s
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
// 如果超过20s,将当前ConnectionId添加进行outDatedConnections集合中
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
...
//4.client active detection.
// 探活机制,遍历超时连接对象,对各个超时连接对象发送一个async请求,再将能正常响应的连接添加进successConnections集合中
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
// CountDownLatch机制
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
// 发送请求给客户端 进行探活
// 将正常响应的ConnectionId添加进successConnections集合中
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
...
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
// 在outDatedConnections集合中,且不在successConnections集合中的超时连接id就会去调用unregister()进行注销
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
Loggers.REMOTE_DIGEST
.info("[{}]Unregister Out dated connection....", outDateConnectionId);
// 1. 将连接中connections集合中移除,将clientId从clients集合中删除
// 2.发布事件
// 处理事件中会删除注册表和订阅表中的信息、数据同步至其他节点
unregister(outDateConnectionId);
}
}
}
...
} catch (Throwable e) {
Loggers.REMOTE.error("Error occurs during connection check... ", e);
}
}
unregister()
方法的具体实现:
- 将连接中connections集合中移除,将clientId从clients集合中删除
- 发布事件
- 处理事件中会删除注册表和订阅表中的信息、数据同步至其他节点
public synchronized void unregister(String connectionId) {
// 将连接中connections集合中移除
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
// 核心方法 最终会调用到ConnectionBasedClientManager类的clientDisconnected()方法中
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
// 最终会调用到ConnectionBasedClientManager类的clientDisconnected()方法中
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
// clientId从clients集合中移除
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
// 发布ClientDisconnectEvent事件
// 处理事件中会删除注册表和订阅表中的信息、数据同步至其他节点
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
-----------------------------------------------------------------------------------
// ClientServiceIndexesManager类中的onEvent(Event event)方法
@Override
public void onEvent(Event event) {
// 根据两类事件去调用对应的方法,再进行更细致的判断
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 移除订阅表和注册表中的信息
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
// 注册服务、注销服务、订阅服务、取消订阅,相关的事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
// 移除订阅表和注册表中的信息
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());
}
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());
}
}
-----------------------------------------------------------------------------------
// DistroClientDataProcessor类中的syncToAllServer(..)方法
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 服务实例移除时会发布该事件
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
// 实例移除后,同步给其他节点
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 服务注册时会发布该事件
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
// 比如如果有新增加的实例,同步给其他节点
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
集群数据同步
在进行服务注册流程中,会发布ClientChangedEvent、ClientRegisterServiceEvent(将clientId添加进服务注册表)、InstanceMetadataEvent(实例元数据相关)
在服务实例检查检查流程中,如果出现了要注销的服务实例就会发布ClientDisconnectEvent事件
处理上面这两个事件的位置是DistroClientDataProcessor
类中的syncToAllServer(..)
方法
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
// 服务实例移除时会发布该事件
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
// 实例移除后,同步给其他节点
distroProtocol.sync(distroKey, DataOperation.DELETE);
// 服务注册时会发布该事件
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
// 比如如果有新增加的实例,同步给其他节点
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
Nacos2.X出现的缓存集合
服务注册过程:
-
一个集合专门存放service,
singletonRepository <Service, Service>
集合 -
一个命名空间下所有的service,
namespaceSingletonMaps <namespace, Set<Service>>
集合 -
一个集合存放Client对象,
clients <clientId, IpPortBasedClient>
集合 -
Client对象中的
publishers <Service, InstancePublishInfo>
集合,保存着service与instance -
service和Client的绑定
publisherIndexes <Service, Set<clientId>>
集合
服务发现过程:
-
服务注册表的副本,
serviceDataIndexes <Service, ServiceInfo>
ServiceInfo中存的是List<Instance>
-
服务与cluster的对应关系
serviceClusterIndex <Service, Set<clusters>>
-
Client对象中的
subscribers <Service, Subscriber>
集合,保存着Subscriber订阅了Service服务 -
service和多个订阅方进行绑定
subscriberIndexes <Service, Set<subscriberClientId>>
Nacos2.X总流程图
grpc客户端初始化
以服务注册举例,NamingService
接口的服务注册方法如下,这里是通过clientProxy
对象发送的grpc调用。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 调用NacosServer端,发送服务注册, Nacos2.X采用的是grpc方式请求,所以这里是去NamingGrpcClientProxy实现类的方法中
clientProxy.registerService(serviceName, groupName, instance);
}
我们来看看clientProxy
对象是怎么创建的
private void init(Properties properties) throws NacosException {
...
// 这里创建了一个NamingClientProxyDelegate对象
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
-----------------------------------------------------------------------------------------------
// 继续更NamingClientProxyDelegate对象的创建过程,构造方法如下
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
...
// 创建NamingGrpcClientProxy代理对象
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
-----------------------------------------------------------------------------------------------------
// 再看看NamingGrpcClientProxy对象的构造方法
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
...
// 我们可以看看rpcClient是如何创建的,这里就直接 new GrpcSdkClient(clientNameInner) ,再给其中的线程池相关的属性赋值
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
this.redoService = new NamingGrpcRedoService(this);
// 在进入到start()方法中
start(serverListFactory, serviceInfoHolder);
}
-----------------------------------------------------------------------------------------------------------------------
// 上面已经创建了 rpcClient对象,这里就是对该对象进行相应的处理
// 核心就是下面的start()方法
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
// 创建NamingPushRequestHandler对象,该对象的作用就是接收NacosServer端推送过来的服务实例信息,并更新本地缓存,因为我们会有订阅的功能
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 再继续看start()方法
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
核心start()方法:
-
修改rpcClient的状态为starting
-
创建一个只有两个线程对象的线程池
-
第一个线程执行,处理连接成功后将连接状态改为true,断开连接后将连接状态给为fasle
-
第二个线程执行,处理连接失败后重连、每隔5s发送一个心跳
-
开始连接到服务器,尝试连接到服务器同步3次
-
如果成功了修改rpcClientStatus状态为running,并往eventLinkedBlockingQueue阻塞队列中添加一个ConnectionEvent事件。
第一个线程会去处理
-
如果失败了将ReconnectContext重连上下文对象存入一个reconnectionSignal阻塞队列中,在上方的第二个线程中会异步重连
-
public final void start() throws NacosException {
// 修改rpcClient的状态为starting中,如果有其他线程来了这里就直接返回,只有一个线程启动就可以了
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 创建一个只有两个线程对象的线程池
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// connection event consumer.
// 第一个线程开始执行
// 连接成功/断开连接后修改连接状态为true/false
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
// 连接成功,其实就是ConnectionEventListener连接时间监听器的处理
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
// 第二个线程开始执行
// 异步重连
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
// 从reconnectionSignal重连阻塞队列中取ReconnectContext重连上下文对象
// 如果该阻塞队列中没有数据的话,就等待5s, 5s之后取一个null , 重复进行健康检查
// 如果取出数据。则不会null,去进行重连的逻辑
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
// 发送心跳
// 向NacosServer发送一个请求,进行健康检查
// 该方法 不仅要检查服务器是否可以,还要检查连接是否注册
boolean isHealthy = healthCheck();
if (!isHealthy) {
// 如果不正常的情况下
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(..);
// 检查当前rpcClientStatus是不是shutdown
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// 为重连上下文对象赋值,进行重连的逻辑
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
// 每一次发送完心跳后,修改时间
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
// 进入重连的逻辑
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
boolean serverExist = false;
// 获取server端信息
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(......);
reconnectContext.serverInfo = null;
}
}
// 重新连接NacosServer
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
// 连接到服务器,尝试连接到服务器同步3次,如果失败,异步启动
Connection connectToServer = null;
// 修改rpcClientStatus状态为starting
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = RETRY_TIMES;
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
// 从application.properties文件中配置的NacosServer集群信息中拿一台
ServerInfo serverInfo = nextRpcServer();
LoggerUtils.printIfInfoEnabled(......);
// 连接NacosServer端
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(......);
}
}
// 在3次内 连接NacosServer成功了
if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
// 修改rpcClientStatus状态为running
rpcClientStatus.set(RpcClientStatus.RUNNING);
// 往eventLinkedBlockingQueue阻塞队列中添加一个ConnectionEvent事件
// 在上方的第一个线程中会处理该事件
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
// 尝试连接到服务器同步3次,如果失败,异步启动
// 其实就是将ReconnectContext重连上下文对象存入一个reconnectionSignal阻塞队列中。
// 在上方的第二个线程中会异步重连
switchServerAsync();
}
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
grpc服务端初始化
grpc服务端的入口为BaseGrpcServer.startServer()
,这里其实是会先进入到父类的start()
方法,然后再调用到子类的startServer()
方法
@Override
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 核心方法
// grpc相关,需要对grpc有一定了解
addServices(handlerRegistry, serverInterceptor);
// 服务端的端口 + 1000偏移量
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
// 最大入站消息大小
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
// 压缩与解压缩相关的
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
// 添加拦截器
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
connectionManager.unregister(connectionId);
}
}
}).build();
// server启动
// 底层对netty做了一些封装,其实就是启动了一个netty的server
server.start();
}
更多推荐
所有评论(0)