Nacos 2.x 源码深度解析 (九):双向流设计 —— 连接创建复用与销毁

《Nacos 2.x源码深度解析》专栏目录
一、架构通信篇:
《Nacos 2.x 源码深度解析 (一):架构整体全貌 —— 核心模块划分与版本演进》
《Nacos 2.x 源码深度解析 (二):通信协议迭代 —— HTTP长轮询到gRPC演进》
二、配置中心篇
《Nacos 2.x 源码深度解析 (三):配置中心客户端 —— 启动加载与自动装配》
《Nacos 2.x 源码深度解析 (四):配置中心服务端 —— 事件总线与数据持久化》
《Nacos 2.x 源码深度解析 (五):gRPC 推送链路 —— 配置变更下发与动态刷新》
《Nacos 2.x 源码深度解析 (六):三级缓存体系 —— 降级兜底与故障自愈机制》
三、服务注册发现篇
《Nacos 2.x 源码深度解析 (七):服务注册流程 —— 客户端上报与服务端存储》
《Nacos 2.x 源码深度解析 (八):服务订阅机制 —— 从首次订阅到gRPC双向流变更通知》
《Nacos 2.x 源码深度解析 (九):双向流设计 —— 连接创建复用与销毁》
在《Nacos 2.x 源码深度解析 (二):通信协议迭代 —— HTTP长轮询到gRPC演进》一篇,已经简单分析了Nacos选择gRPC的原因,以及相关代码,基于长连接通信的基础上,本篇将聚焦gRPC双向流长连接,围绕连接生命周期做全方位源码剖析,并围绕以下核心问题逐层展开:
- Nacos 配置中心与注册中心分别在什么时机初始化gRPC客户端?
- Nacos 配置中心与注册中心的连接模型有何区别?
- 底层 RPC 框架如何统一封装建连流程?
- Nacos 如何通过多层缓存结构实现连接复用,减少 TCP 连接资源占用?
- Nacos 如何处理异常的连接?怎么做到优雅的关闭连接?
读者可带着以上问题,循序渐进吃透双向流从创建、复用、销毁的完整生命周期,彻底掌握Nacos2.x RPC通信底层内核。
一、客户端:双向流连接创建流程
1.1 配置中心连接触发时机:ensureRpcClient()
在配置中心篇,我们已经深入分析了配置中心的启动与初始化流程,其中创建gRPC双向流通道的前置逻辑,可回顾《Nacos 2.x 源码深度解析 (五):gRPC 推送链路 —— 配置变更下发与动态刷新》一文。
这里我们简要描述一下整体流程:NacosConfigService初始化时创建 ClientWorker,并实例化 ConfigRpcTransportClient 作为RPC传输层。ClientWorker构造器调用 agent.start() 进入 startInternal(),启动一个阻塞等待5秒的主监听线程。当首次 addTenantListeners() 或 getConfig() 触发 notifyListenConfig() 唤醒主线程后,监听循环执行 executeConfigListen() 中的 checkListenCache(),进而调用 ensureRpcClient(taskId) 懒加载创建GrpcClient:通过 RpcClientFactory.createClient() 创建实例,注册配置变更推送处理器与连接监听器,最后调用 rpcClient.start() 建立 gRPC 连接与双向流通道。同一taskId后续复用已创建连接,不再重复建连。

ConfigRpcTransportClient通过checkListenCache方法批量处理配置监听任务,并基于taskId 维度调用 ensureRpcClient 确保每个监听分组拥有独立的gRPC连接。这段代码展示了配置中心gRPC双向流建立通道的入口rpcClient.start()。
com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient
// 批量检查配置是否变更
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) throws NacosException {
// 遍历所有待检查的CacheData,按taskId分组
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
// 获取或创建该taskId对应的gRPC客户端,不同taskId使用独立的RpcClient,实现连接隔离,首次调用时创建新的gRPC连接,后续复用已有连接
RpcClient rpcClient = ensureRpcClient(taskId);
}
}
——————————————————————————————————————————————————————————————————————————————
com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient
// 获取或创建指定taskId对应的gRPC客户端
private RpcClient ensureRpcClient(String taskId) throws NacosException {
// 加锁:保证同一taskId只创建一个RpcClient
synchronized (ClientWorker.this) {
Map<String, String> labels = getLabels(); // 获取全局标签(如客户端版本、应用名等)
Map<String, String> newLabels = new HashMap<>(labels); // 构建客户端标签
// 将taskId注入标签,作为配置监听分组标识
newLabels.put("taskId", taskId);
// 从配置属性中读取TLS证书配置,如果配置了TLS,创建加密连接;否则使用明文连接
RpcClientTlsConfig clientTlsConfig = RpcClientTlsConfigFactory.getInstance()
.createSdkConfig(properties);
// 创建RpcClient实例,命名规则:uuid + "_config-" + taskId
RpcClient rpcClient = RpcClientFactory.createClient(
uuid + "_config-" + taskId, // 客户端唯一标识
getConnectionType(), // 连接类型(GRPC)
newLabels, // 携带标签(含taskId)
clientTlsConfig); // TLS 配置
// isWaitInitiated() 返回true表示该RpcClient尚未启动,只有首次创建时才需要注册Handler并启动连接
if (rpcClient.isWaitInitiated()) {
// 注册服务端推送请求的处理器,当服务端通过gRPC双向流推送ConfigChangeNotifyRequest时,由这里注册的 Handler处理(更新缓存标记 + 唤醒主循环)
initRpcClientHandler(rpcClient);
// 设置租户信息(namespaceId),用于服务端识别
rpcClient.setTenant(getTenant());
// 启动gRPC客户端,内部调用connectToServer() 建立gRPC连接和双向流通道
rpcClient.start();
}
return rpcClient;
}
}
1.2 注册中心连接触发时机:start()
在注册中心篇,我们已经深入分析了注册中心的启动与初始化流程,其中创建gRPC双向流通道的前置逻辑,可回顾《Nacos 2.x 源码深度解析 (八):服务订阅机制 —— 从首次订阅到gRPC双向流变更通知》一文。
这里我们简要描述一下整体流程:NacosNamingService 初始化时创建 NamingClientProxyDelegate,由其实例化 NamingGrpcClientProxy。NamingGrpcClientProxy 先通过 RpcClientFactory.createClient() 创建底层 GrpcClient 实例,再初始化NamingGrpcRedoService(构造时即启动3秒一次的定时重试任务)。随后调用start() 方法,在此方法中依次注册连接监听器(redoService)、注册服务端推送处理器(NamingPushRequestHandler),最后调用 rpcClient.start() 建立 gRPC 连接与双向流通道。

NamingGrpcClientProxy 是 Nacos 服务发现客户端的 gRPC 通信核心,其构造函数完成了从连接创建到推送接收的完整初始化。首先保存 namespaceId、生成 UUID 唯一标识、读取超时配置,然后构建携带 SOURCE=sdk、MODULE=naming 和 APPNAME 的客户端标签,通过 RpcClientFactory.createClient() 创建 gRPC 客户端实例,同时初始化 NamingGrpcRedoService 作为断线重试缓存层。
随后调用 start() 完成五项启动准备:
- 设置服务端列表工厂以获取 Nacos 地址
- 注册
redoService为连接断开监听器(断连时标记所有实例待重试) - 注册
NamingPushRequestHandler为服务端推送处理器(收到实例变更推送时直接更新本地ServiceInfoHolder缓存) - 调用
rpcClient.start()建立gRPC长连接和双向流通道 - 向
NotifyCenter注册事件订阅以监听InstancesChangeEvent
这段代码展示了注册中心gRPC双向流建立通道的入口rpcClient.start()。
com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
// 调用父类构造函数,注入安全代理
super(securityProxy);
// 命名空间 ID,用于服务端区分不同租户的数据
this.namespaceId = namespaceId;
// 生成客户端唯一标识(UUID),用于区分不同客户端实例
this.uuid = UUID.randomUUID().toString();
// gRPC请求超时时间(毫秒),-1表示使用默认超时
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
// 标签会随gRPC连接上报到服务端,用于服务端识别连接来源
Map<String, String> labels = new HashMap<>();
// 标记来源:SDK(Nacos 官方客户端)
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
// 标记模块:NAMING(服务发现模块)
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
// 标记应用名(如 spring.application.name)
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
// 创建gRPC客户端
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfigFactory.getInstance().createSdkConfig(properties.asProperties()));
// 创建redo重试服务
this.redoService = new NamingGrpcRedoService(this, properties);
// 启动gRPC客户端
start(serverListFactory, serviceInfoHolder);
}
——————————————————————————————————————————————————————————————————————————————
com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
// 启动gRPC客户端
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
// 设置服务端列表工厂
rpcClient.serverListFactory(serverListFactory);
// 注册连接断开监听器
rpcClient.registerConnectionListener(redoService);
// 注册服务端推送请求处理器
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 启动gRPC客户端,内部调用connectToServer() 建立gRPC连接和双向流通道
rpcClient.start();
// 注册NotifyCenter事件订阅
NotifyCenter.registerSubscriber(this);
}
1.3 rpc客户端启动:RpcClient.start()
前两小节展示了配置中心和注册中心rpc客户端创建的核心入口方法,两者最终交汇于RpcClient.start()。

RpcClient.start()先通过 CAS 状态检查避免重复启动,然后创建两个后台线程:
-
线程一阻塞消费连接事件队列,分别回调
onConnected/onDisConnect通知业务层; -
线程二负责健康保活与自动重连,空闲时执行
healthCheck(),发现服务端不健康时自动切换节点重连。两个线程启动后立即同步建连,按重试次数遍历服务端列表逐一调用connectToServer()(子类GrpcClient实现,内含 TCP 建连、双向流绑定、ConnectionSetupRequest握手),建连成功后设置RUNNING状态并发布CONNECTED事件触发业务回调。最后注册ConnectResetRequestHandler(处理服务端发起的连接重置指令)和ClientDetectionRequest处理器(响应服务端存活探测)。这个
start()方法统一了Config和Naming两个模块的gRPC客户端启动流程,子类只需实现connectToServer()即可完成差异化的建连逻辑。
com.alibaba.nacos.common.remote.client.RpcClient
// 启动gRPC客户端。是配置中心和注册中心客户端连接建立的共同入口。
public final void start() throws NacosException {
// 避免重复启动
boolean success = rpcClientStatus.compareAndSet(INITIALIZED, STARTING);
if (!success) { return; }
// 创建后台工作线程池(2线程)
clientEventExecutor = new ScheduledThreadPoolExecutor(2,
new NameThreadFactory("com.alibaba.nacos.client.remote.worker"));
// 线程一:消费连接事件,触发回调
clientEventExecutor.submit(() -> {
while (true) {
ConnectionEvent event = eventLinkedBlockingQueue.take();
if (event.isConnected()) {
notifyConnected(event.connection); // 通知 ConnectionEventListener
} else if (event.isDisConnected()) {
notifyDisConnected(event.connection); // 通知 ConnectionEventListener
}
}
});
// 线程二:健康检查 + 自动重连
clientEventExecutor.submit(() -> {
while (true) {
ReconnectContext ctx = reconnectionSignal.poll(connectionKeepAlive, MILLISECONDS);
if (ctx == null) {
// 超时未收到重连信号,执行健康检查
if (isIdle()) {
if (!healthCheck()) {
setStatus(UNHEALTHY);
reconnect(null, false); // 自动重连
} else {
refreshLastActiveTime();
}
}
continue;
}
reconnect(ctx.serverInfo, ctx.onRequestFail); // 收到重连信号,立即重连
}
});
// 同步建连
Connection connection = null;
int retryTimes = rpcClientConfig.retryTimes(); // 默认3次
while (retryTimes >= 0 && connection == null) {
retryTimes--;
connection = connectToServer(nextRpcServer()); // GrpcClient.connectToServer()
}
if (connection != null) {
currentConnection = connection;
rpcClientStatus.set(RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(CONNECTED, connection)); // 通知线程一
} else {
switchServerAsync(); // 全部失败,异步重连其他节点
}
// 注册内置请求处理器
registerServerRequestHandler(new ConnectResetRequestHandler()); // 服务端通知客户端重置连接
registerServerRequestHandler((request, connection) -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse(); // 服务端探测客户端是否存活
}
return null;
});
}
1.4 建连核心方法:connectToServer()
connectToServer() 是gRPC 客户端与服务端建立完整通信通道的核心方法:
-
先创建
ManagedChannel建立 TCP 连接,通过Unary一元调用ServerCheckRequest获取服务端分配的connectionId并验证服务端可达; -
随后基于同一 TCP 通道创建
BiRequestStreamStub并调用bindRequestStream()绑定双向流回调,使客户端能够接收服务端推送的请求(如ConfigChangeNotifyRequest或NotifySubscriberRequest); -
接着将
StreamObserver、Unary stub、ManagedChannel封装到GrpcConnection中,通过双向流发送ConnectionSetupRequest完成建连握手; -
最后等待能力协商响应,协商通过后返回完整的
GrpcConnection,建连失败则关闭通道返回null。
同一
ManagedChannel同时承载Unary请求和双向流,实现了单TCP连接上的请求-响应与实时推送复用。
com.alibaba.nacos.common.remote.client.grpc.GrpcClient
//建立gRPC连接的核心方法。
public Connection connectToServer(ServerInfo serverInfo) {
try {
// 创建TCP连接通道
// 端口 = serverPort + rpcPortOffset(默认为 0)
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
// 创建Unary一元调用的future stub
RequestGrpc.RequestFutureStub futureStub = createNewChannelStub(managedChannel);
// 发送ServerCheckRequest,验证服务端可达并获取connectionId,这是首次与目标服务端的通信,用于确认服务端存活
Response response = serverCheck(serverInfo.getServerIp(), port, futureStub);
if (!(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
String connectionId = ((ServerCheckResponse) response).getConnectionId();
// 创建双向流stub,复用同一个ManagedChannel,不额外创建TCP连接
BiRequestStreamGrpc.BiRequestStreamStub biStreamStub = BiRequestStreamGrpc
.newStub(futureStub.getChannel());
// 构造GrpcConnection并绑定双向流
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// 绑定双向流的StreamObserver回调
// onNext:处理服务端推送的请求(ConfigChangeNotify / NotifySubscriber)
// onError/onCompleted:触发重连
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biStreamStub, grpcConn);
grpcConn.setPayloadStreamObserver(payloadStreamObserver); // 用于向服务端回复响应
grpcConn.setGrpcFutureServiceStub(futureStub); // 用于 Unary 请求
grpcConn.setChannel(managedChannel); // 用于后续关闭
// 发送ConnectionSetupRequest完成建连握手
ConnectionSetupRequest setupRequest = new ConnectionSetupRequest();
setupRequest.setClientVersion(VersionUtils.getFullClientVersion());
setupRequest.setLabels(super.getLabels());
setupRequest.setAbilityTable(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
setupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(setupRequest); // 通过双向流发送
// 等待能力协商结果
if (recAbilityContext.isNeedToSync()) {
recAbilityContext.await(capabilityNegotiationTimeout(), MILLISECONDS);
if (!recAbilityContext.check(grpcConn)) {
return null; // 能力不匹配,建连失败
}
} else {
// 兼容旧版本服务端:等待 100ms确保建连完成
Thread.sleep(100L);
}
return grpcConn;
} catch (Exception e) {
recAbilityContext.release(null);
}
return null;
}
二、客户端连接复用与销毁
2.1 客户端连接复用设计
客户端的连接复用分三层。RpcClientFactory 通过ConcurrentHashMap.computeIfAbsent 按 clientName 缓存 RpcClient 实例,Config 配置中心按 taskId 分别缓存(uuid + "_config-" + taskId),Naming 注册中心全局一个(uuid),同一 name 多次调用返回同一个实例。
单个 GrpcClient 内部,currentConnection 永久复用,除非断线重连不会更换。单个 GrpcConnection 内部的 ManagedChannel 在 gRPC 生命周期内永久复用,且同一TCP连接上同时承载两条gRPC通道:RequestFutureStub(Unary)处理客户端主动请求,StreamObserver(双向流)处理服务端推送和回执,由 gRPC 框架按方法名自动路由。
三层复用叠加实现了一个客户端进程对单节点永久只维持少量 TCP 连接,所有请求、监听、推送全部复用已有连接。
com.alibaba.nacos.common.remote.client.RpcClientFactory
// 全局缓存:clientName —> RpcClient
private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();
// 创建或获取RpcClient, name已存在时直接返回缓存实例,不再新建。
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels,
RpcClientTlsConfig tlsConfig) {
return createClient(clientName, connectionType, null, null, labels, tlsConfig);
}
——————————————————————————————————————————————————————————————————————————————
com.alibaba.nacos.common.remote.client.grpc.GrpcClient
// GrpcClient.connectToServer() 中,同一条连接上同时承载:
public Connection connectToServer(ServerInfo serverInfo) {
// ManagedChannel(TCP连接), 在整个gRPC生命周期内永久复用
ManagedChannel managedChannel = createNewManagedChannel(ip, port);
// 基于同一channel创建Unary stub(查询、注册、订阅)
RequestFutureStub futureStub = createNewChannelStub(managedChannel);
// 基于同一channel创建双向流stub(服务端推送)
BiRequestStreamStub biStreamStub = BiRequestStreamGrpc.newStub(managedChannel);
// 两者封装到同一个GrpcConnection
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setPayloadStreamObserver(payloadStreamObserver); // 双向流的发送通道
grpcConn.setGrpcFutureServiceStub(futureStub); // Unary的发送通道
grpcConn.setChannel(managedChannel); // 连接生命周期管理
// 后续所有操作复用同一client:
// client.request() -> futureStub 走 Unary
// client.asyncRequest() -> streamObserver走双向流
// client.replyResponse() -> streamObserver走双向流
return grpcConn;
}
// RpcClient.start() 将连接设为当前连接,永久复用:
if (connectToServer != null) {
this.currentConnection = connectToServer; // 除非断线重连,否则一直用这个
rpcClientStatus.set(RpcClientStatus.RUNNING);
}
2.2 客户端连接关闭设计
客户端连接关闭覆盖四种场景:
-
应用主动关闭时
shutdown()将状态标记为SHUTDOWN,终止两个后台线程后调用closeConnection():先通过payloadStreamObserver.onCompleted()通知服务端双向流断开,再关闭ManagedChannel释放 TCP 连接,最后投递DISCONNECTED事件通知业务监听器。 -
健康检查失败时
healthCheck()向当前连接发送HealthCheckRequest(Unary),连续失败返回false,线程二检测到后将状态设为UNHEALTHY并触发reconnect()切换节点。 -
双向流断开时
onError/onCompleted回调直接通过 CAS 将状态从RUNNING切为UNHEALTHY并发起重连。 -
服务端引导重连时
ConnectResetRequestHandler收到指令,根据是否携带推荐地址调用switchServerAsync()切换到指定节点或随机节点。四种场景最终汇入同一个
reconnect()方法,循环尝试建连直到成功或shutdown,新连接建立后关闭旧连接并通知业务层CONNECTED。
————————————————————————————① 应用主动关闭————————————————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient
public void shutdown() throws NacosException {
// 标记状态为 SHUTDOWN,后台线程二检测到后停止重连循环
rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
// 终止两个后台线程:
// 线程一:阻塞在 eventLinkedBlockingQueue.take() 上,shutdownNow 后抛出异常退出
// 线程二:阻塞在 reconnectionSignal.poll() 上,下次循环检测到 SHUTDOWN 退出
if (clientEventExecutor != null) {
clientEventExecutor.shutdownNow();
}
// 关闭当前 gRPC 连接
closeConnection(currentConnection);
}
// 关闭连接并通知监听器
private void closeConnection(Connection connection) {
if (connection != null) {
connection.close(); // → GrpcConnection.close()
eventLinkedBlockingQueue.add(
new ConnectionEvent(DISCONNECTED, connection)); // → 通知业务层(如 RedoService)
}
}
——————————————————————————————————————————————————————————————————————————————
com.alibaba.nacos.common.remote.client.grpc.GrpcConnection
// 客户端主动关闭连接。
public void close() {
// 关闭双向流:向服务端发送stream completed信号,服务端收到后触发onCompleted -> ConnectionManager.unregister()
if (this.payloadStreamObserver != null) {
try {
payloadStreamObserver.onCompleted();
} catch (Throwable ignored) {
}
}
// 关闭gRPC ManagedChannel, 释放 TCP 连接和线程
if (this.channel != null && !channel.isShutdown()) {
try {
this.channel.shutdownNow(); // 立即关闭,不等待未完成的请求
} catch (Throwable ignored) {
}
}
}
————————————————————————————② 健康检查失败自动重连:线程二检测————————————————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient#healthCheck
private boolean healthCheck() {
// 通过当前连接发送 HealthCheckRequest(Unary),重试 3 次
// 成功则更新 lastActiveTimeStamp
// 失败则返回 false → 线程二自动切换节点
Response response = this.currentConnection.request(healthCheckRequest, healthCheckTimeOut);
return response != null && response.isSuccess();
}
————————————————————————————③ 双向流断开被动检测:bindRequestStream回调————————————————————————————
com.alibaba.nacos.common.remote.client.grpc.GrpcClient#bindRequestStream
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
public void onError(Throwable t) {
if (rpcClientStatus.compareAndSet(RUNNING, UNHEALTHY)) {
switchServerAsync(); // 异步切换到其他节点
}
}
public void onCompleted() {
if (isRunning() && !grpcConn.isAbandon()) {
rpcClientStatus.compareAndSet(RUNNING, UNHEALTHY);
switchServerAsync();
}
}
});
}
————————————————————————————④ 服务端引导重连————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient.ConnectResetRequestHandler#requestReply
public Response requestReply(Request request, Connection connection) {
if (request instanceof ConnectResetRequest) {
ConnectResetRequest req = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(req.getServerIp())) {
switchServerAsync(resolveServerInfo(req.getServerIp() + ":" + req.getServerPort()), false);
} else {
switchServerAsync(); // 无推荐地址 → 随机切换
}
}
return new ConnectResetResponse();
}
}
// 重连逻辑:reconnect()
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
// 若onRequestFail且健康检查通过 -> 恢复RUNNING,跳过重连
// 否则循环直到建连成功或 shutdown:
while (!switchSuccess && !isShutdown()) {
ServerInfo serverInfo = recommendServer ?? nextRpcServer();
Connection connectionNew = connectToServer(serverInfo); // 建连
if (connectionNew != null) {
closeConnection(currentConnection); // 关闭旧连接
currentConnection = connectionNew; // 替换新连接
eventLinkedBlockingQueue.add(new ConnectionEvent(CONNECTED)); // 通知业务层
return;
}
}
}
// 关闭连接:closeConnection()
private void closeConnection(Connection connection) {
if (connection != null) {
connection.close(); // GrpcConnection.close()
// ManagedChannel.shutdownNow()
eventLinkedBlockingQueue.add(new ConnectionEvent(DISCONNECTED)); // 通知监听器
}
三、服务端:双向流接收与通道绑定
3.1 服务端启动流程:startServer()

startServer() 在单个Netty端口中同时注册两个gRPC服务:Request/request(Unary)处理客户端主动请求,BiRequestStream/requestBiStream(双向流)处理建连握手和服务端推送。构建 NettyServerBuilder 时依次配置消息大小限制、压缩器、keepalive 参数和 TLS 协议协商器后启动服务端,开始监听客户端连接。客户端建连时双向流到达 GrpcBiStreamRequestAcceptor,ConnectionSetupRequest 握手后由 ConnectionManager 统一管理连接生命周期;后续所有请求经同一个 Netty 端口按 addServices() 注册的路由规则分别分发到 Unary 和双向流两条通道处理。
com.alibaba.nacos.core.remote.grpc.BaseGrpcServer
// 启动gRPC服务端,监听客户端连接。
public void startServer() throws Exception {
// 注册gRPC服务
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
addServices(handlerRegistry, getSeverInterceptors().toArray(...));
// 构建Netty gRPC Server
NettyServerBuilder builder = NettyServerBuilder
.forPort(getServicePort()) // 监听端口
.executor(getRpcExecutor()); // 请求处理线程池
// 协议协商器(用于TLS等加密协议)
Optional<InternalProtocolNegotiator.ProtocolNegotiator> negotiator = newProtocolNegotiator();
if (negotiator.isPresent()) {
builder.protocolNegotiator(negotiator.get());
}
// 注册传输过滤器(如连接追踪、日志等)
for (ServerTransportFilter each : getServerTransportFilters()) {
builder.addTransportFilter(each);
}
// 配置并启动
server = builder
.maxInboundMessageSize(getMaxInboundMessageSize()) // 最大消息体
.fallbackHandlerRegistry(handlerRegistry) // 服务处理器
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.keepAliveTime(getKeepAliveTime(), MILLISECONDS) // 服务端 keepalive 间隔
.keepAliveTimeout(getKeepAliveTimeout(), MILLISECONDS) // keepalive 超时
.permitKeepAliveTime(getPermitKeepAliveTime(), MILLISECONDS) // 客户端 keepalive 允许间隔
.build();
server.start(); // 启动Netty,开始监听端口
}
3.2 注册gRPC服务:addServices()

addServices() 在单个Netty端口中注册两条gRPC服务通道。Unary 通道 Request/request 通过 asyncUnaryCall 包装 handleCommonRequest(),客户端通过 RequestFutureStub.request(payload) 调用,服务端分配独立响应流处理一次请求;双向流通道 BiRequestStream/requestBiStream 通过 asyncBidiStreamingCall 包装 GrpcBiStreamRequestAcceptor.requestBiStream(),客户端建立一条长连接双向流,后续服务端推送(配置变更通知、命名服务实例变更)和客户端回执均复用此流。两条通道各自定义 MethodDescriptor 描述方法名和序列化方式,组装为 ServerServiceDefinition 后注入 getSeverInterceptors() 拦截器(TPS 限流、参数校验、鉴权等),最后注册到 MutableHandlerRegistry。gRPC 框架在客户端连接到达时按 fullMethodName 自动路由到对应通道。
com.alibaba.nacos.core.remote.grpc.BaseGrpcServer
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// 定义Unary方法的描述符:方法名、请求/响应类型、序列化器
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(
MethodDescriptor.generateFullMethodName(
GrpcServerConstants.REQUEST_SERVICE_NAME, // Request
GrpcServerConstants.REQUEST_METHOD_NAME)) // request
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.build();
// 创建Unary调用处理器:收到请求后调 handleCommonRequest()
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
(request, responseObserver) -> handleCommonRequest(request, responseObserver));
// 组装为完整的服务定义,注册到 MutableHandlerRegistry
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler)
.build();
// 注册Unary服务(Request/request),注册时注入拦截器(TPS 限流、参数校验等)
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// 创建双向流处理器:客户端流到达后调requestBiStream() 返回 StreamObserver
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
// 定义双向流方法的描述符
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(
MethodDescriptor.generateFullMethodName(
GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME, // BiRequestStream
GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME)) //requestBiStream
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.build();
// 组装并注册
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME)
.addMethod(biStreamMethod, biStreamHandler)
.build();
// 注册双向流服务(BiRequestStream/requestBiStream)
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
3.3 一元调用处理: request()
客户端通过 Unary 通道发出的请求经过两级处理。BaseGrpcServer.handleCommonRequest() 是第一道入口,先校验来源——SDK 请求只能走 GrpcSdkServer、集群请求只能走 GrpcClusterServer,来源不匹配时返回 502 拒绝。
校验通过后交由 GrpcRequestAcceptor.request() 继续处理。
第二道入口依次执行:服务端未完成启动时拒绝所有业务请求;ServerCheckRequest 直接返回 connectionId 不经过 Handler 路由;从 RequestHandlerRegistry 按请求类型匹配对应的 RequestHandler(如 ConfigQueryRequest → ConfigQueryRequestHandler);校验连接是否已在 ConnectionManager 中注册;反序列化 Payload 为 Request 对象;构建 RequestMeta(含客户端 IP、版本、标签、能力表),通过 prepareRequestContext() 写入请求上下文供后续鉴权和参数校验使用;最后调用 handler.handleRequest() 执行具体业务逻辑,响应在限流超阈值时延迟 1 秒返回,异常时返回 ErrorResponse。两级入口均记录 MetricsMonitor 监控指标,handleCommonRequest 侧重来源隔离,request 侧重请求路由和生命周期管理。
com.alibaba.nacos.core.remote.grpc.BaseGrpcServer
// 处理客户端通过 Unary 通道发来的请求。
protected void handleCommonRequest(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
// 检查请求来源是否被允许访问当前服务端
// GrpcSdkServer收到的SDK请求只能访问SDK允许的接口
// GrpcClusterServer收到的集群请求只能访问集群接口
if (!invokeSourceAllowCheck(grpcRequest)) {
// 来源不允许 → 返回 502,拒绝处理
responseObserver.onNext(GrpcUtils.convert(
ErrorResponse.build(NacosException.BAD_GATEWAY,
String.format(" invoke %s from %s is forbidden",
grpcRequest.getMetadata().getType(), this.getSource()))));
responseObserver.onCompleted();
MetricsMonitor.recordGrpcRequestEvent(grpcRequest.getMetadata().getType(),
false, NacosException.BAD_GATEWAY, null, null, 0);
return;
}
// 路由到请求处理器
grpcCommonRequestAcceptor.request(grpcRequest, responseObserver);
}
——————————————————————————————————————————————————————————————————————————————
com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor
// 处理所有通过Unary通道到达的客户端请求。
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
// 启动状态检查,服务端尚未完成启动时拒绝所有业务请求
if (!ApplicationUtils.isStarted()) {
sendResponse(responseObserver,
ErrorResponse.build(INVALID_SERVER_STATUS, "Server is starting,please try later."));
return;
}
// ServerCheck请求特例处理,ServerCheckRequest在connectToServer阶段由客户端发出,
// 直接返回ServerCheckResponse(含 connectionId),不经过Handler路由
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
sendResponse(responseObserver,
new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true));
return;
}
// 根据请求类型从HandlerRegistry中匹配已注册的Handler
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
if (requestHandler == null) {
sendResponse(responseObserver,
ErrorResponse.build(NO_HANDLER, "RequestHandler Not Found"));
return;
}
// 检查客户端是否已在ConnectionManager中注册
String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
if (!connectionManager.checkValid(connectionId)) {
sendResponse(responseObserver,
ErrorResponse.build(UN_REGISTER, "Connection is unregistered."));
return;
}
// 反序列化 Payload —> Request
Object parseObj = GrpcUtils.parse(grpcRequest);
if (!(parseObj instanceof Request)) {
sendResponse(responseObserver,
ErrorResponse.build(BAD_GATEWAY, "Invalid request"));
return;
}
Request request = (Request) parseObj;
// 构建请求上下文,执行Handler
try {
Connection connection = connectionManager.getConnection(connectionId);
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(connectionId);
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(connectionId); // 更新连接最后活跃时间
prepareRequestContext(request, requestMeta, connection);
// 执行具体 Handler.handle(),拿到业务响应
Response response = requestHandler.handleRequest(request, requestMeta);
// 限流超阈值时延迟 1 秒返回
if (response.getErrorCode() == OVER_THRESHOLD) {
RpcScheduledExecutor.CONTROL_SCHEDULER.schedule(() -> {
sendResponse(responseObserver, response);
}, 1000L, MILLISECONDS);
} else {
sendResponse(responseObserver, response);
}
} catch (Throwable e) {
sendResponse(responseObserver, ErrorResponse.build(e));
} finally {
RequestContextHolder.removeContext();
}
}
3.4 双向流调用处理:requestBiStream()

当客户端发起 BiRequestStream/requestBiStream gRPC 调用时,该方法返回一个 StreamObserver,此后由 gRPC 框架回调其 onNext/onError/onCompleted 处理流上的消息。
onNext 处理两类消息:收到 ConnectionSetupRequest 时,提取客户端信息(IP、版本、应用名、标签、租户、能力表)创建 GrpcConnection,调用 connectionManager.register() 注册连接并触发 ConnectionEventListener.onConnected(),同时通过 sendRequestNoAck 响应 SetupAckRequest 携带服务端能力表完成建连握手;
注册失败(服务端未启动完成或连接数超限)时关闭连接拒绝。收到 Response 时,调用 RpcAckCallbackSynchronizer.ackNotify() 唤醒等待 Unary 响应的客户端线程,并更新连接最后活跃时间。
onError 和 onCompleted 在流异常断开或客户端主动关闭时清理gRPC流状态,不做业务清理,业务清理由 ConnectionManager 在检测到连接超时或 BaseRpcServer 的 ConnectionEventListener 触发 ClientDisconnectEvent 时完成。
// 处理客户端发起的双向流连接。
public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
// 从gRPC上下文中提取连接信息
final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
String clientIp = "";
// 返回 StreamObserver,此后由gRPC框架回调onNext/onError/onCompleted
return new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload);
// 解析Payload
Object parseObj;
parseObj = GrpcUtils.parse(payload);
// ConnectionSetupRequest:建连握手
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
// 提取客户端信息:应用名、租户、标签、能力表
String appName = setUpRequest.getLabels() != null
? setUpRequest.getLabels().getOrDefault(Constants.APPNAME, "-") : "-";
// 创建连接元数据和GrpcConnection
ConnectionMeta metaInfo = new ConnectionMeta(connectionId,
payload.getMetadata().getClientIp(), remoteIp, remotePort,
localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
GrpcConnection connection = new GrpcConnection(metaInfo, responseObserver,
GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
if (setUpRequest.getAbilityTable() != null) {
connection.setAbilityTable(setUpRequest.getAbilityTable());
}
// 注册连接
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
// 服务端未启动完成或连接数超限 —> 拒绝连接
connection.close();
} else {
// 注册成功 → 响应SetupAckRequest,携带服务端能力表
connection.sendRequestNoAck(new SetupAckRequest(
NacosAbilityManagerHolder.getInstance()
.getCurrentNodeAbilities(AbilityMode.SERVER)));
}
// Response:Unary请求的回执
} else if (parseObj instanceof Response) {
Response response = (Response) parseObj;
RpcAckCallbackSynchronizer.ackNotify(connectionId, response); // 唤醒等待线程
connectionManager.refreshActiveTime(connectionId); // 更新活跃时间
}
}
@Override
public void onError(Throwable t) {
// 流异常断开,通知ClientCallStreamObserver清理
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver scso = (ServerCallStreamObserver) responseObserver;
if (!scso.isCancelled()) {
scso.onCompleted();
}
}
}
@Override
public void onCompleted() {
// 客户端主动关闭流,清理
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver scso = (ServerCallStreamObserver) responseObserver;
if (!scso.isCancelled()) {
scso.onCompleted();
}
}
}
};
}
四、服务端连接复用与销毁
4.1 服务端连接复用核心:ConnectionManager
服务端通过 ConnectionManager 管理所有gRPC长连接,核心是一个ConcurrentHashMap<String, Connection> 按 connectionId 缓存。
建连时 ConnectionSetupRequest 握手触发 register(),经过幂等检查、限流判断后存入缓存,同时通过connectionForClientIp 统计单 IP 连接数,最后调用 notifyClientConnected() 通知naming模块创建业务Client。后续每次请求(Unary 查询或双向流推送)gRPC 框架自动携带 connectionId,getConnection() 直接 HashMap 查找复用,refreshActiveTime() 续期连接。
断连时 unregister() 移除缓存、关闭连接、通知业务层清理实例和订阅数据。后台每3秒通过 NacosRuntimeConnectionEjector扫描所有连接,lastActiveTime 超时的连接通过 ConnectResetRequest 引导客户端重连。
整个生命周期中,connectionId 作为全局唯一标识贯穿建连、复用、销毁全部环节,保障了服务端对海量客户端连接的高效管理。
// 服务端连接管理器。
@Service
public class ConnectionManager {
// 核心缓存:connectionId -> Connection(ConcurrentHashMap)
Map<String, Connection> connections = new ConcurrentHashMap<>();
// IP -> 连接数的计数映射,用于单IP限制
Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<>();
// 连接事件监听器注册表,通知naming/config模块
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
// 注册一个新连接
public synchronized boolean register(String connectionId, Connection connection) {
if (connections.containsKey(connectionId)) return true; // 幂等
if (checkLimit(connection)) return false; // 连接数超限
connections.put(connectionId, connection); // 存入缓存
connectionForClientIp.computeIfAbsent(clientIp, k -> new AtomicInteger(0)).getAndIncrement();
clientConnectionEventListenerRegistry.notifyClientConnected(connection); // 触发业务回调
return true;
}
// 注销一个连接
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
atomicInteger.decrementAndGet();
remove.close();
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
// 连接查询与校验(每次请求时调用),按id查找连接(每秒数千次调用)
public Connection getConnection(String connectionId) {
return connections.get(connectionId); // O(1) HashMap 查找
}
// 校验连接是否有效(GrpcRequestAcceptor 中调用)
public boolean checkValid(String connectionId) {
return connections.containsKey(connectionId);
}
// 更新连接活跃时间(每次有请求或回执时调用)
public void refreshActiveTime(String connectionId) {
Connection connection = connections.get(connectionId);
if (connection != null) connection.freshActiveTime();
}
// 过期连接清理(定时任务,每3秒)
@PostConstruct
public void start() {
initConnectionEjector();
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
runtimeConnectionEjector.doEject(); // 踢出过期连接
MetricsMonitor.getLongConnectionMonitor().set(connections.size());
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
}
4.2 服务端连接关闭设计
服务端连接关闭覆盖三种场景:
- 客户端双向流断开或
GrpcBiStreamRequestAcceptor的onCompleted/onError触发unregister()时,从connections缓存中移除连接,递减该 IP 的连接计数(归零时移除 Key),关闭 gRPC 连接和双向流,最后通知业务层清理实例和订阅数据。 - 定时清理过期连接时,
NacosRuntimeConnectionEjector每3秒执行一次过期连接清理,遍历所有连接收集lastActiveTime空闲超过 20 秒的连接,通过ClientDetectionRequest探活,有响应则续期,无响应则等待全部探活完成后由ConnectionManager统一清理。 - 负载过高或运维干预时,
loadSingle()通过ConnectResetRequest向指定连接发送重连指令,携带目标节点地址引导客户端切换到其他节点。三种场景覆盖了服务端主动断开、被动探活清理和负载均衡调度,保障了连接生命周期管理的高效性和集群稳定性。
————————————————————————————① 服务端主动断连 ————————————————————————————————————————
com.alibaba.nacos.core.remote.ConnectionManager
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId); // 从缓存移除
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet(); // 递减 IP 连接计数
if (count <= 0) {
connectionForClientIp.remove(clientIp); // 归零时移除
}
}
remove.close(); // 关闭gRPC连接
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); // 通知业务层
}
}
————————————————————————————② 定时过期连接清理(每3秒执行) ————————————————————————————————————————
com.alibaba.nacos.core.remote.NacosRuntimeConnectionEjector
private void ejectOutdatedConnection() {
Map<String, Connection> connections = connectionManager.connections;
Set<String> outDatedConnections = new HashSet<>();
// 收集过期连接
for (Entry<String, Connection> entry : connections.entrySet()) {
Connection client = entry.getValue();
long idleTime = now - client.getMetaInfo().getLastActiveTime();
if (idleTime >= KEEP_ALIVE_TIME) { // 空闲超过 20 秒
outDatedConnections.add(client.getMetaInfo().getConnectionId());
} else if (client.getMetaInfo().pushQueueBlockTimesLastOver(300 * 1000)) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// 发送探活请求,无响应则清理
for (String connId : outDatedConnections) {
Connection connection = connectionManager.getConnection(connId);
if (connection != null) {
connection.asyncRequest(new ClientDetectionRequest(), new RequestCallBack() {
void onResponse(Response response) {
if (response != null && response.isSuccess()) {
connection.freshActiveTime(); // 有响应 → 续期
}
}
void onException(Throwable e) { // 无响应 -> 等待所有回调完成
latch.countDown(); // 最后统一调用 unregister()
}
});
}
}
// 等待所有探活完成,无响应的连接由ConnectionManager统一清理
}
————————————————————————————③ 负载均衡连接踢出 ————————————————————————————————————————
com.alibaba.nacos.core.remote.ConnectionManager
// 当集群节点过载或运维手动调整连接分布时,通过loadSingle() 向指定连接发送 ConnectResetRequest,引导客户端重连到其他节点
public boolean loadSingle(String connectionId, String redirectAddress) {
Connection connection = getConnection(connectionId);
if (connection != null) {
ConnectResetRequest resetRequest = new ConnectResetRequest();
resetRequest.setServerIp(redirectIp);
resetRequest.setServerPort(redirectPort);
connection.request(resetRequest, 3000L); // 发送重连指令
// 客户端收到后 -> ConnectResetRequestHandler -> switchServerAsync()
}
return true;
}
三、全文小结
本章围绕Nacos 2.x gRPC双向流长连接,深入分析了客户端、服务端两大维度的连接创建、复用、销毁全生命周期运行机制,并对比分析配置中心与注册中心差异化的连接模型。在客户端层面,配置中心采用基于taskId分组的多连接隔离模型,以懒加载形式初始化RpcClient,不同监听分组独享独立gRPC连接,实现业务隔离;注册中心则采用全局单连接模型,所有服务注册、订阅、心跳、实例查询等请求统一复用唯一双向流通道,减少客户端TCP资源占用。两大模块建连入口虽然不同,但最终均依托RpcClient.start()完成客户端初始化,通过内置双线程分别处理连接事件回调、健康巡检与自动重连,再经由connectToServer()执行建连协议,完成TCP通道创建、双向流绑定、握手协商,最终构建完整通信通道。
同时客户端采用三层连接复用架构,分别从RpcClient实例、GrpcClient连接、TCP通道三个层面实现资源复用,在保障通信性能的前提下,规避频繁创建销毁连接带来的性能损耗,并适配应用主动关闭、健康检测失败、流异常断开、服务端引导重连四类场景,形成一套完善的断线重连与资源回收机制。
在服务端层面,Nacos基于单Netty端口双通道架构,同时兼容Unary一元调用与Bidi双向流式两种通信模式,各司其职分别处理客户端即时同步请求与服务端主动推送事件;依托ConnectionManager作为连接管控核心,以connectionId为唯一标识缓存全部客户端连接,统一实现连接注册、活跃度续期、过期探测、资源注销等功能。此外服务端设计了被动断连清理、定时过期剔除、过载主动踢出三种连接销毁策略,全方位适配日常运行、异常故障、集群负载均衡等不同生产场景。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)