在这里插入图片描述

《Nacos 2.x源码深度解析》专栏目录
一、架构通信篇:
《Nacos 2.x 源码深度解析 (一):架构整体全貌 —— 核心模块划分与版本演进》
《Nacos 2.x 源码深度解析 (二):通信协议迭代 —— HTTP长轮询到gRPC演进》
二、配置中心篇
《Nacos 2.x 源码深度解析 (三):配置中心客户端 —— 启动加载与自动装配》
《Nacos 2.x 源码深度解析 (四):配置中心服务端 —— 事件总线与数据持久化》
《Nacos 2.x 源码深度解析 (五):gRPC 推送链路 —— 配置变更下发与动态刷新》
《Nacos 2.x 源码深度解析 (六):三级缓存体系 —— 降级兜底与故障自愈机制》
三、服务注册发现篇
《Nacos 2.x 源码深度解析 (七):服务注册流程 —— 客户端上报与服务端存储》
《Nacos 2.x 源码深度解析 (八):服务订阅机制 —— 从首次订阅到gRPC双向流变更通知》
《Nacos 2.x 源码深度解析 (九):双向流设计 —— 连接创建复用与销毁》



《Nacos 2.x 源码深度解析 (二):通信协议迭代 —— HTTP长轮询到gRPC演进》一篇,已经简单分析了Nacos选择gRPC的原因,以及相关代码,基于长连接通信的基础上,本篇将聚焦gRPC双向流长连接,围绕连接生命周期做全方位源码剖析,并围绕以下核心问题逐层展开:

  • Nacos 配置中心与注册中心分别在什么时机初始化gRPC客户端?
  • Nacos 配置中心与注册中心的连接模型有何区别?
  • 底层 RPC 框架如何统一封装建连流程?
  • Nacos 如何通过多层缓存结构实现连接复用,减少 TCP 连接资源占用?
  • Nacos 如何处理异常的连接?怎么做到优雅的关闭连接?

读者可带着以上问题,循序渐进吃透双向流从创建、复用、销毁的完整生命周期,彻底掌握Nacos2.x RPC通信底层内核。

一、客户端:双向流连接创建流程

1.1 配置中心连接触发时机:ensureRpcClient()

在配置中心篇,我们已经深入分析了配置中心的启动与初始化流程,其中创建gRPC双向流通道的前置逻辑,可回顾《Nacos 2.x 源码深度解析 (五):gRPC 推送链路 —— 配置变更下发与动态刷新》一文。

这里我们简要描述一下整体流程:NacosConfigService初始化时创建 ClientWorker,并实例化 ConfigRpcTransportClient 作为RPC传输层。ClientWorker构造器调用 agent.start() 进入 startInternal(),启动一个阻塞等待5秒的主监听线程。当首次 addTenantListeners()getConfig() 触发 notifyListenConfig() 唤醒主线程后,监听循环执行 executeConfigListen() 中的 checkListenCache(),进而调用 ensureRpcClient(taskId) 懒加载创建GrpcClient:通过 RpcClientFactory.createClient() 创建实例,注册配置变更推送处理器与连接监听器,最后调用 rpcClient.start() 建立 gRPC 连接与双向流通道。同一taskId后续复用已创建连接,不再重复建连。

在这里插入图片描述

ConfigRpcTransportClient通过checkListenCache方法批量处理配置监听任务,并基于taskId 维度调用 ensureRpcClient 确保每个监听分组拥有独立的gRPC连接。这段代码展示了配置中心gRPC双向流建立通道的入口rpcClient.start()。

com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient
  
// 批量检查配置是否变更
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) throws NacosException {
    // 遍历所有待检查的CacheData,按taskId分组
    for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
        String taskId = entry.getKey();
        
        // 获取或创建该taskId对应的gRPC客户端,不同taskId使用独立的RpcClient,实现连接隔离,首次调用时创建新的gRPC连接,后续复用已有连接
        RpcClient rpcClient = ensureRpcClient(taskId);
    }
}

			——————————————————————————————————————————————————————————————————————————————

com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient   
        
// 获取或创建指定taskId对应的gRPC客户端
private RpcClient ensureRpcClient(String taskId) throws NacosException {
    // 加锁:保证同一taskId只创建一个RpcClient 
    synchronized (ClientWorker.this) {
        Map<String, String> labels = getLabels();  // 获取全局标签(如客户端版本、应用名等)
        Map<String, String> newLabels = new HashMap<>(labels); // 构建客户端标签
        // 将taskId注入标签,作为配置监听分组标识
        newLabels.put("taskId", taskId);
        
        // 从配置属性中读取TLS证书配置,如果配置了TLS,创建加密连接;否则使用明文连接
        RpcClientTlsConfig clientTlsConfig = RpcClientTlsConfigFactory.getInstance()
                .createSdkConfig(properties);
        
        // 创建RpcClient实例,命名规则:uuid + "_config-" + taskId
        RpcClient rpcClient = RpcClientFactory.createClient(
                uuid + "_config-" + taskId,   // 客户端唯一标识
                getConnectionType(),           // 连接类型(GRPC)
                newLabels,                     // 携带标签(含taskId)
                clientTlsConfig);              // TLS 配置
        
        // isWaitInitiated() 返回true表示该RpcClient尚未启动,只有首次创建时才需要注册Handler并启动连接
        if (rpcClient.isWaitInitiated()) {
            // 注册服务端推送请求的处理器,当服务端通过gRPC双向流推送ConfigChangeNotifyRequest时,由这里注册的 Handler处理(更新缓存标记 + 唤醒主循环)
            initRpcClientHandler(rpcClient);
            
            // 设置租户信息(namespaceId),用于服务端识别
            rpcClient.setTenant(getTenant());
            
            // 启动gRPC客户端,内部调用connectToServer() 建立gRPC连接和双向流通道
            rpcClient.start();
        }

        return rpcClient;
    }
}

1.2 注册中心连接触发时机:start()

在注册中心篇,我们已经深入分析了注册中心的启动与初始化流程,其中创建gRPC双向流通道的前置逻辑,可回顾《Nacos 2.x 源码深度解析 (八):服务订阅机制 —— 从首次订阅到gRPC双向流变更通知》一文。

这里我们简要描述一下整体流程:NacosNamingService 初始化时创建 NamingClientProxyDelegate,由其实例化 NamingGrpcClientProxyNamingGrpcClientProxy 先通过 RpcClientFactory.createClient() 创建底层 GrpcClient 实例,再初始化NamingGrpcRedoService(构造时即启动3秒一次的定时重试任务)。随后调用start() 方法,在此方法中依次注册连接监听器(redoService)、注册服务端推送处理器(NamingPushRequestHandler),最后调用 rpcClient.start() 建立 gRPC 连接与双向流通道。

在这里插入图片描述

NamingGrpcClientProxy 是 Nacos 服务发现客户端的 gRPC 通信核心,其构造函数完成了从连接创建到推送接收的完整初始化。首先保存 namespaceId、生成 UUID 唯一标识、读取超时配置,然后构建携带 SOURCE=sdkMODULE=namingAPPNAME 的客户端标签,通过 RpcClientFactory.createClient() 创建 gRPC 客户端实例,同时初始化 NamingGrpcRedoService 作为断线重试缓存层。

随后调用 start() 完成五项启动准备:

  • 设置服务端列表工厂以获取 Nacos 地址
  • 注册 redoService 为连接断开监听器(断连时标记所有实例待重试)
  • 注册 NamingPushRequestHandler为服务端推送处理器(收到实例变更推送时直接更新本地 ServiceInfoHolder 缓存)
  • 调用 rpcClient.start() 建立gRPC长连接和双向流通道
  • NotifyCenter 注册事件订阅以监听 InstancesChangeEvent

这段代码展示了注册中心gRPC双向流建立通道的入口rpcClient.start()。

com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy

public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
        NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
    // 调用父类构造函数,注入安全代理
    super(securityProxy);
    
    // 命名空间 ID,用于服务端区分不同租户的数据
    this.namespaceId = namespaceId;
    // 生成客户端唯一标识(UUID),用于区分不同客户端实例
    this.uuid = UUID.randomUUID().toString();
    // gRPC请求超时时间(毫秒),-1表示使用默认超时
    this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
    
    // 标签会随gRPC连接上报到服务端,用于服务端识别连接来源
    Map<String, String> labels = new HashMap<>();
    // 标记来源:SDK(Nacos 官方客户端)
    labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
    // 标记模块:NAMING(服务发现模块)
    labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
    // 标记应用名(如 spring.application.name)
    labels.put(Constants.APPNAME, AppNameUtils.getAppName());
    
    // 创建gRPC客户端
    this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
            RpcClientTlsConfigFactory.getInstance().createSdkConfig(properties.asProperties()));
    
    // 创建redo重试服务
    this.redoService = new NamingGrpcRedoService(this, properties);
    
    // 启动gRPC客户端
    start(serverListFactory, serviceInfoHolder);
}
    
			——————————————————————————————————————————————————————————————————————————————
      
com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy

// 启动gRPC客户端
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
    // 设置服务端列表工厂
    rpcClient.serverListFactory(serverListFactory);
    
    // 注册连接断开监听器
    rpcClient.registerConnectionListener(redoService);
    
    // 注册服务端推送请求处理器
    rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
    
    // 启动gRPC客户端,内部调用connectToServer() 建立gRPC连接和双向流通道
    rpcClient.start();
    
    // 注册NotifyCenter事件订阅
    NotifyCenter.registerSubscriber(this);
}

1.3 rpc客户端启动:RpcClient.start()

前两小节展示了配置中心和注册中心rpc客户端创建的核心入口方法,两者最终交汇于RpcClient.start()。

在这里插入图片描述

RpcClient.start()先通过 CAS 状态检查避免重复启动,然后创建两个后台线程:

  • 线程一阻塞消费连接事件队列,分别回调 onConnected / onDisConnect 通知业务层;

  • 线程二负责健康保活与自动重连,空闲时执行 healthCheck(),发现服务端不健康时自动切换节点重连。两个线程启动后立即同步建连,按重试次数遍历服务端列表逐一调用 connectToServer()(子类 GrpcClient 实现,内含 TCP 建连、双向流绑定、ConnectionSetupRequest 握手),建连成功后设置 RUNNING 状态并发布 CONNECTED 事件触发业务回调。最后注册 ConnectResetRequestHandler(处理服务端发起的连接重置指令)和 ClientDetectionRequest处理器(响应服务端存活探测)。

    这个 start() 方法统一了Config和Naming两个模块的gRPC客户端启动流程,子类只需实现connectToServer() 即可完成差异化的建连逻辑。

com.alibaba.nacos.common.remote.client.RpcClient

// 启动gRPC客户端。是配置中心和注册中心客户端连接建立的共同入口。
public final void start() throws NacosException {

    // 避免重复启动
    boolean success = rpcClientStatus.compareAndSet(INITIALIZED, STARTING);
    if (!success) { return; }

    // 创建后台工作线程池(2线程)
    clientEventExecutor = new ScheduledThreadPoolExecutor(2,
            new NameThreadFactory("com.alibaba.nacos.client.remote.worker"));

    // 线程一:消费连接事件,触发回调
    clientEventExecutor.submit(() -> {
        while (true) {
            ConnectionEvent event = eventLinkedBlockingQueue.take();
            if (event.isConnected()) {
                notifyConnected(event.connection);       // 通知 ConnectionEventListener
            } else if (event.isDisConnected()) {
                notifyDisConnected(event.connection);    // 通知 ConnectionEventListener
            }
        }
    });

    // 线程二:健康检查 + 自动重连
    clientEventExecutor.submit(() -> {
        while (true) {
            ReconnectContext ctx = reconnectionSignal.poll(connectionKeepAlive, MILLISECONDS);
            if (ctx == null) {
                // 超时未收到重连信号,执行健康检查
                if (isIdle()) {
                    if (!healthCheck()) {
                        setStatus(UNHEALTHY);
                        reconnect(null, false);           // 自动重连
                    } else {
                        refreshLastActiveTime();
                    }
                }
                continue;
            }
            reconnect(ctx.serverInfo, ctx.onRequestFail); // 收到重连信号,立即重连
        }
    });

    // 同步建连 
    Connection connection = null;
    int retryTimes = rpcClientConfig.retryTimes();       // 默认3次
    while (retryTimes >= 0 && connection == null) {
        retryTimes--;
        connection = connectToServer(nextRpcServer());   // GrpcClient.connectToServer()
    }

    if (connection != null) {
        currentConnection = connection;
        rpcClientStatus.set(RUNNING);
        eventLinkedBlockingQueue.offer(new ConnectionEvent(CONNECTED, connection));  // 通知线程一
    } else {
        switchServerAsync();  // 全部失败,异步重连其他节点
    }

    // 注册内置请求处理器 
    registerServerRequestHandler(new ConnectResetRequestHandler());  // 服务端通知客户端重置连接
    registerServerRequestHandler((request, connection) -> {
        if (request instanceof ClientDetectionRequest) {
            return new ClientDetectionResponse();                   // 服务端探测客户端是否存活
        }
        return null;
    });
}

1.4 建连核心方法:connectToServer()

connectToServer() 是gRPC 客户端与服务端建立完整通信通道的核心方法:

  • 先创建 ManagedChannel 建立 TCP 连接,通过Unary一元调用 ServerCheckRequest 获取服务端分配的 connectionId 并验证服务端可达;

  • 随后基于同一 TCP 通道创建 BiRequestStreamStub 并调用 bindRequestStream() 绑定双向流回调,使客户端能够接收服务端推送的请求(如 ConfigChangeNotifyRequestNotifySubscriberRequest);

  • 接着将 StreamObserver、Unary stub、ManagedChannel 封装到 GrpcConnection 中,通过双向流发送 ConnectionSetupRequest 完成建连握手;

  • 最后等待能力协商响应,协商通过后返回完整的 GrpcConnection,建连失败则关闭通道返回 null

    在这里插入图片描述

    同一 ManagedChannel 同时承载Unary请求和双向流,实现了单TCP连接上的请求-响应与实时推送复用。

com.alibaba.nacos.common.remote.client.grpc.GrpcClient

//建立gRPC连接的核心方法。
public Connection connectToServer(ServerInfo serverInfo) {

    try {
        // 创建TCP连接通道
        // 端口 = serverPort + rpcPortOffset(默认为 0)
        int port = serverInfo.getServerPort() + rpcPortOffset();
        ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
        // 创建Unary一元调用的future stub
        RequestGrpc.RequestFutureStub futureStub = createNewChannelStub(managedChannel);

        // 发送ServerCheckRequest,验证服务端可达并获取connectionId,这是首次与目标服务端的通信,用于确认服务端存活
        Response response = serverCheck(serverInfo.getServerIp(), port, futureStub);
        if (!(response instanceof ServerCheckResponse)) {
            shuntDownChannel(managedChannel);
            return null;
        }
        String connectionId = ((ServerCheckResponse) response).getConnectionId();

        // 创建双向流stub,复用同一个ManagedChannel,不额外创建TCP连接
        BiRequestStreamGrpc.BiRequestStreamStub biStreamStub = BiRequestStreamGrpc
                .newStub(futureStub.getChannel());

        // 构造GrpcConnection并绑定双向流
        GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
        grpcConn.setConnectionId(connectionId);

        // 绑定双向流的StreamObserver回调
        // onNext:处理服务端推送的请求(ConfigChangeNotify / NotifySubscriber)
        // onError/onCompleted:触发重连
        StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biStreamStub, grpcConn);
        grpcConn.setPayloadStreamObserver(payloadStreamObserver);  // 用于向服务端回复响应
        grpcConn.setGrpcFutureServiceStub(futureStub);            // 用于 Unary 请求
        grpcConn.setChannel(managedChannel);                       // 用于后续关闭

        // 发送ConnectionSetupRequest完成建连握手
        ConnectionSetupRequest setupRequest = new ConnectionSetupRequest();
        setupRequest.setClientVersion(VersionUtils.getFullClientVersion());
        setupRequest.setLabels(super.getLabels());
        setupRequest.setAbilityTable(
                NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
        setupRequest.setTenant(super.getTenant());
        grpcConn.sendRequest(setupRequest);   // 通过双向流发送

        // 等待能力协商结果
        if (recAbilityContext.isNeedToSync()) {
            recAbilityContext.await(capabilityNegotiationTimeout(), MILLISECONDS);
            if (!recAbilityContext.check(grpcConn)) {
                return null;  // 能力不匹配,建连失败
            }
        } else {
            // 兼容旧版本服务端:等待 100ms确保建连完成
            Thread.sleep(100L);
        }
      
        return grpcConn;
    } catch (Exception e) {
        recAbilityContext.release(null);
    }
    return null;
}

二、客户端连接复用与销毁

2.1 客户端连接复用设计

客户端的连接复用分三层。RpcClientFactory 通过ConcurrentHashMap.computeIfAbsentclientName 缓存 RpcClient 实例,Config 配置中心按 taskId 分别缓存(uuid + "_config-" + taskId),Naming 注册中心全局一个(uuid),同一 name 多次调用返回同一个实例。

单个 GrpcClient 内部,currentConnection 永久复用,除非断线重连不会更换。单个 GrpcConnection 内部的 ManagedChannel 在 gRPC 生命周期内永久复用,且同一TCP连接上同时承载两条gRPC通道:RequestFutureStub(Unary)处理客户端主动请求,StreamObserver(双向流)处理服务端推送和回执,由 gRPC 框架按方法名自动路由。

三层复用叠加实现了一个客户端进程对单节点永久只维持少量 TCP 连接,所有请求、监听、推送全部复用已有连接。

com.alibaba.nacos.common.remote.client.RpcClientFactory
     
   // 全局缓存:clientName —> RpcClient
  private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();

  // 创建或获取RpcClient, name已存在时直接返回缓存实例,不再新建。
  public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels,
            RpcClientTlsConfig tlsConfig) {
        return createClient(clientName, connectionType, null, null, labels, tlsConfig);
    }

			——————————————————————————————————————————————————————————————————————————————

com.alibaba.nacos.common.remote.client.grpc.GrpcClient
        
// GrpcClient.connectToServer() 中,同一条连接上同时承载:
 public Connection connectToServer(ServerInfo serverInfo) {
    // ManagedChannel(TCP连接), 在整个gRPC生命周期内永久复用
    ManagedChannel managedChannel = createNewManagedChannel(ip, port);

    // 基于同一channel创建Unary stub(查询、注册、订阅)
    RequestFutureStub futureStub = createNewChannelStub(managedChannel);

    // 基于同一channel创建双向流stub(服务端推送)
    BiRequestStreamStub biStreamStub = BiRequestStreamGrpc.newStub(managedChannel);

    // 两者封装到同一个GrpcConnection
    GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
    grpcConn.setPayloadStreamObserver(payloadStreamObserver);   // 双向流的发送通道
    grpcConn.setGrpcFutureServiceStub(futureStub);              // Unary的发送通道
    grpcConn.setChannel(managedChannel);                        // 连接生命周期管理

    // 后续所有操作复用同一client:
    //   client.request()         -> futureStub 走 Unary
    //   client.asyncRequest()    -> streamObserver走双向流
    //   client.replyResponse()   -> streamObserver走双向流
    return grpcConn;
}

// RpcClient.start() 将连接设为当前连接,永久复用:
if (connectToServer != null) {
    this.currentConnection = connectToServer;  // 除非断线重连,否则一直用这个
    rpcClientStatus.set(RpcClientStatus.RUNNING);
}



2.2 客户端连接关闭设计

客户端连接关闭覆盖四种场景:

  • 应用主动关闭时 shutdown() 将状态标记为 SHUTDOWN,终止两个后台线程后调用 closeConnection():先通过 payloadStreamObserver.onCompleted() 通知服务端双向流断开,再关闭 ManagedChannel 释放 TCP 连接,最后投递 DISCONNECTED 事件通知业务监听器。

  • 健康检查失败时 healthCheck() 向当前连接发送 HealthCheckRequest(Unary),连续失败返回 false,线程二检测到后将状态设为 UNHEALTHY 并触发 reconnect() 切换节点。

  • 双向流断开时 onError/onCompleted 回调直接通过 CAS 将状态从 RUNNING 切为 UNHEALTHY 并发起重连。

  • 服务端引导重连时 ConnectResetRequestHandler 收到指令,根据是否携带推荐地址调用 switchServerAsync() 切换到指定节点或随机节点。

    四种场景最终汇入同一个 reconnect() 方法,循环尝试建连直到成功或shutdown,新连接建立后关闭旧连接并通知业务层 CONNECTED

			————————————————————————————① 应用主动关闭————————————————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient
  
public void shutdown() throws NacosException {
    // 标记状态为 SHUTDOWN,后台线程二检测到后停止重连循环
    rpcClientStatus.set(RpcClientStatus.SHUTDOWN);

    // 终止两个后台线程:
    // 线程一:阻塞在 eventLinkedBlockingQueue.take() 上,shutdownNow 后抛出异常退出
    // 线程二:阻塞在 reconnectionSignal.poll() 上,下次循环检测到 SHUTDOWN 退出
    if (clientEventExecutor != null) {
        clientEventExecutor.shutdownNow();
    }

    // 关闭当前 gRPC 连接
    closeConnection(currentConnection);
}

// 关闭连接并通知监听器
private void closeConnection(Connection connection) {
    if (connection != null) {
        connection.close();                         // → GrpcConnection.close()
        eventLinkedBlockingQueue.add(
                new ConnectionEvent(DISCONNECTED, connection));  // → 通知业务层(如 RedoService)
    }
}

			——————————————————————————————————————————————————————————————————————————————
        
com.alibaba.nacos.common.remote.client.grpc.GrpcConnection
        
// 客户端主动关闭连接。
public void close() {
    // 关闭双向流:向服务端发送stream completed信号,服务端收到后触发onCompleted -> ConnectionManager.unregister()
    if (this.payloadStreamObserver != null) {
        try {
            payloadStreamObserver.onCompleted();
        } catch (Throwable ignored) {
        }
    }

    // 关闭gRPC ManagedChannel, 释放 TCP 连接和线程
    if (this.channel != null && !channel.isShutdown()) {
        try {
            this.channel.shutdownNow();  // 立即关闭,不等待未完成的请求
        } catch (Throwable ignored) {
        }
    }
}

			————————————————————————————② 健康检查失败自动重连:线程二检测————————————————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient#healthCheck
        
private boolean healthCheck() {
    // 通过当前连接发送 HealthCheckRequest(Unary),重试 3 次
    // 成功则更新 lastActiveTimeStamp
    // 失败则返回 false → 线程二自动切换节点
    Response response = this.currentConnection.request(healthCheckRequest, healthCheckTimeOut);
    return response != null && response.isSuccess();
}


			————————————————————————————③ 双向流断开被动检测:bindRequestStream回调————————————————————————————
com.alibaba.nacos.common.remote.client.grpc.GrpcClient#bindRequestStream
        
 private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
            final GrpcConnection grpcConn) {
        
        return streamStub.requestBiStream(new StreamObserver<Payload>() {
            
            public void onError(Throwable t) {
                if (rpcClientStatus.compareAndSet(RUNNING, UNHEALTHY)) {
                    switchServerAsync();                       // 异步切换到其他节点
                }
            }
            
            public void onCompleted() {
                if (isRunning() && !grpcConn.isAbandon()) {
                    rpcClientStatus.compareAndSet(RUNNING, UNHEALTHY);
                    switchServerAsync();
                }
            }
        });
    }


			————————————————————————————④ 服务端引导重连————————————————————————————
com.alibaba.nacos.common.remote.client.RpcClient.ConnectResetRequestHandler#requestReply
        
public Response requestReply(Request request, Connection connection) {
        if (request instanceof ConnectResetRequest) {
            ConnectResetRequest req = (ConnectResetRequest) request;
            if (StringUtils.isNotBlank(req.getServerIp())) {
                switchServerAsync(resolveServerInfo(req.getServerIp() + ":" + req.getServerPort()), false);
            } else {
                switchServerAsync();                           // 无推荐地址 → 随机切换
            }
        }
        return new ConnectResetResponse();
    }
}

//  重连逻辑:reconnect()
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    // 若onRequestFail且健康检查通过 -> 恢复RUNNING,跳过重连
    // 否则循环直到建连成功或 shutdown:
    while (!switchSuccess && !isShutdown()) {
        ServerInfo serverInfo = recommendServer ?? nextRpcServer();
        Connection connectionNew = connectToServer(serverInfo);   // 建连
        if (connectionNew != null) {
            closeConnection(currentConnection);                   // 关闭旧连接
            currentConnection = connectionNew;                    // 替换新连接
            eventLinkedBlockingQueue.add(new ConnectionEvent(CONNECTED));  // 通知业务层
            return;
        }
    }
}

// 关闭连接:closeConnection() 
private void closeConnection(Connection connection) {
    if (connection != null) {
        connection.close();                                        // GrpcConnection.close()
                                                                   // ManagedChannel.shutdownNow()
        eventLinkedBlockingQueue.add(new ConnectionEvent(DISCONNECTED));  // 通知监听器
    }


三、服务端:双向流接收与通道绑定

3.1 服务端启动流程:startServer()

在这里插入图片描述

startServer() 在单个Netty端口中同时注册两个gRPC服务:Request/request(Unary)处理客户端主动请求,BiRequestStream/requestBiStream(双向流)处理建连握手和服务端推送。构建 NettyServerBuilder 时依次配置消息大小限制、压缩器、keepalive 参数和 TLS 协议协商器后启动服务端,开始监听客户端连接。客户端建连时双向流到达 GrpcBiStreamRequestAcceptorConnectionSetupRequest 握手后由 ConnectionManager 统一管理连接生命周期;后续所有请求经同一个 Netty 端口按 addServices() 注册的路由规则分别分发到 Unary 和双向流两条通道处理。

com.alibaba.nacos.core.remote.grpc.BaseGrpcServer

// 启动gRPC服务端,监听客户端连接。
public void startServer() throws Exception {

    // 注册gRPC服务
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
    addServices(handlerRegistry, getSeverInterceptors().toArray(...));

    // 构建Netty gRPC Server
    NettyServerBuilder builder = NettyServerBuilder
            .forPort(getServicePort())        // 监听端口
            .executor(getRpcExecutor());       // 请求处理线程池

    // 协议协商器(用于TLS等加密协议)
    Optional<InternalProtocolNegotiator.ProtocolNegotiator> negotiator = newProtocolNegotiator();
    if (negotiator.isPresent()) {
        builder.protocolNegotiator(negotiator.get());
    }

    // 注册传输过滤器(如连接追踪、日志等)
    for (ServerTransportFilter each : getServerTransportFilters()) {
        builder.addTransportFilter(each);
    }

    // 配置并启动
    server = builder
            .maxInboundMessageSize(getMaxInboundMessageSize())   // 最大消息体
            .fallbackHandlerRegistry(handlerRegistry)            // 服务处理器
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .keepAliveTime(getKeepAliveTime(), MILLISECONDS)     // 服务端 keepalive 间隔
            .keepAliveTimeout(getKeepAliveTimeout(), MILLISECONDS) // keepalive 超时
            .permitKeepAliveTime(getPermitKeepAliveTime(), MILLISECONDS) // 客户端 keepalive 允许间隔
            .build();

    server.start();  // 启动Netty,开始监听端口
}


3.2 注册gRPC服务:addServices()

在这里插入图片描述

addServices() 在单个Netty端口中注册两条gRPC服务通道。Unary 通道 Request/request 通过 asyncUnaryCall 包装 handleCommonRequest(),客户端通过 RequestFutureStub.request(payload) 调用,服务端分配独立响应流处理一次请求;双向流通道 BiRequestStream/requestBiStream 通过 asyncBidiStreamingCall 包装 GrpcBiStreamRequestAcceptor.requestBiStream(),客户端建立一条长连接双向流,后续服务端推送(配置变更通知、命名服务实例变更)和客户端回执均复用此流。两条通道各自定义 MethodDescriptor 描述方法名和序列化方式,组装为 ServerServiceDefinition 后注入 getSeverInterceptors() 拦截器(TPS 限流、参数校验、鉴权等),最后注册到 MutableHandlerRegistry。gRPC 框架在客户端连接到达时按 fullMethodName 自动路由到对应通道。

com.alibaba.nacos.core.remote.grpc.BaseGrpcServer
  
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
    // 定义Unary方法的描述符:方法名、请求/响应类型、序列化器
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.UNARY)
            .setFullMethodName(
                    MethodDescriptor.generateFullMethodName(
                            GrpcServerConstants.REQUEST_SERVICE_NAME,          // Request
                            GrpcServerConstants.REQUEST_METHOD_NAME))          // request
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .build();

    // 创建Unary调用处理器:收到请求后调 handleCommonRequest()
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
            (request, responseObserver) -> handleCommonRequest(request, responseObserver));

    // 组装为完整的服务定义,注册到 MutableHandlerRegistry
    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
            GrpcServerConstants.REQUEST_SERVICE_NAME)
            .addMethod(unaryPayloadMethod, payloadHandler)
            .build();
    // 注册Unary服务(Request/request),注册时注入拦截器(TPS 限流、参数校验等)
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));

    // 创建双向流处理器:客户端流到达后调requestBiStream() 返回 StreamObserver
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
            (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

    // 定义双向流方法的描述符
    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.BIDI_STREAMING)
            .setFullMethodName(
                    MethodDescriptor.generateFullMethodName(
                            GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,     // BiRequestStream
                            GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))     //requestBiStream
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .build();

    // 组装并注册
    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(
            GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME)
            .addMethod(biStreamMethod, biStreamHandler)
            .build();
    // 注册双向流服务(BiRequestStream/requestBiStream)
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}


3.3 一元调用处理: request()

客户端通过 Unary 通道发出的请求经过两级处理。BaseGrpcServer.handleCommonRequest() 是第一道入口,先校验来源——SDK 请求只能走 GrpcSdkServer、集群请求只能走 GrpcClusterServer,来源不匹配时返回 502 拒绝。

校验通过后交由 GrpcRequestAcceptor.request() 继续处理。

第二道入口依次执行:服务端未完成启动时拒绝所有业务请求;ServerCheckRequest 直接返回 connectionId 不经过 Handler 路由;从 RequestHandlerRegistry 按请求类型匹配对应的 RequestHandler(如 ConfigQueryRequestConfigQueryRequestHandler);校验连接是否已在 ConnectionManager 中注册;反序列化 PayloadRequest 对象;构建 RequestMeta(含客户端 IP、版本、标签、能力表),通过 prepareRequestContext() 写入请求上下文供后续鉴权和参数校验使用;最后调用 handler.handleRequest() 执行具体业务逻辑,响应在限流超阈值时延迟 1 秒返回,异常时返回 ErrorResponse。两级入口均记录 MetricsMonitor 监控指标,handleCommonRequest 侧重来源隔离,request 侧重请求路由和生命周期管理。

com.alibaba.nacos.core.remote.grpc.BaseGrpcServer

// 处理客户端通过 Unary 通道发来的请求。
protected void handleCommonRequest(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    // 检查请求来源是否被允许访问当前服务端
    // GrpcSdkServer收到的SDK请求只能访问SDK允许的接口
    // GrpcClusterServer收到的集群请求只能访问集群接口
    if (!invokeSourceAllowCheck(grpcRequest)) {
        // 来源不允许 → 返回 502,拒绝处理
        responseObserver.onNext(GrpcUtils.convert(
                ErrorResponse.build(NacosException.BAD_GATEWAY,
                        String.format(" invoke %s from %s is forbidden",
                                grpcRequest.getMetadata().getType(), this.getSource()))));
        responseObserver.onCompleted();
        MetricsMonitor.recordGrpcRequestEvent(grpcRequest.getMetadata().getType(),
                false, NacosException.BAD_GATEWAY, null, null, 0);
        return;
    }

    // 路由到请求处理器
    grpcCommonRequestAcceptor.request(grpcRequest, responseObserver);
}

			——————————————————————————————————————————————————————————————————————————————

com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor

// 处理所有通过Unary通道到达的客户端请求。
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    // 启动状态检查,服务端尚未完成启动时拒绝所有业务请求
    if (!ApplicationUtils.isStarted()) {
        sendResponse(responseObserver,
                ErrorResponse.build(INVALID_SERVER_STATUS, "Server is starting,please try later."));
        return;
    }
        
    // ServerCheck请求特例处理,ServerCheckRequest在connectToServer阶段由客户端发出,
    // 直接返回ServerCheckResponse(含 connectionId),不经过Handler路由
    if (ServerCheckRequest.class.getSimpleName().equals(type)) {
        sendResponse(responseObserver,
                new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true));
        return;
    }

    // 根据请求类型从HandlerRegistry中匹配已注册的Handler
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    if (requestHandler == null) {
        sendResponse(responseObserver,
                ErrorResponse.build(NO_HANDLER, "RequestHandler Not Found"));
        return;
    }

    // 检查客户端是否已在ConnectionManager中注册
    String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
    if (!connectionManager.checkValid(connectionId)) {
        sendResponse(responseObserver,
                ErrorResponse.build(UN_REGISTER, "Connection is unregistered."));
        return;
    }

    // 反序列化 Payload —> Request
    Object parseObj = GrpcUtils.parse(grpcRequest);
    if (!(parseObj instanceof Request)) {
        sendResponse(responseObserver,
                ErrorResponse.build(BAD_GATEWAY, "Invalid request"));
        return;
    }
    Request request = (Request) parseObj;

    // 构建请求上下文,执行Handler
    try {
        Connection connection = connectionManager.getConnection(connectionId);
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(connectionId);
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        requestMeta.setAbilityTable(connection.getAbilityTable());
        connectionManager.refreshActiveTime(connectionId);  // 更新连接最后活跃时间
        prepareRequestContext(request, requestMeta, connection);

        // 执行具体 Handler.handle(),拿到业务响应
        Response response = requestHandler.handleRequest(request, requestMeta);

        // 限流超阈值时延迟 1 秒返回
        if (response.getErrorCode() == OVER_THRESHOLD) {
            RpcScheduledExecutor.CONTROL_SCHEDULER.schedule(() -> {
                sendResponse(responseObserver, response);
            }, 1000L, MILLISECONDS);
        } else {
            sendResponse(responseObserver, response);
        }
    } catch (Throwable e) {
        sendResponse(responseObserver, ErrorResponse.build(e));
    } finally {
        RequestContextHolder.removeContext();
    }
}

3.4 双向流调用处理:requestBiStream()

在这里插入图片描述

当客户端发起 BiRequestStream/requestBiStream gRPC 调用时,该方法返回一个 StreamObserver,此后由 gRPC 框架回调其 onNext/onError/onCompleted 处理流上的消息。

onNext 处理两类消息:收到 ConnectionSetupRequest 时,提取客户端信息(IP、版本、应用名、标签、租户、能力表)创建 GrpcConnection,调用 connectionManager.register() 注册连接并触发 ConnectionEventListener.onConnected(),同时通过 sendRequestNoAck 响应 SetupAckRequest 携带服务端能力表完成建连握手;

注册失败(服务端未启动完成或连接数超限)时关闭连接拒绝。收到 Response 时,调用 RpcAckCallbackSynchronizer.ackNotify() 唤醒等待 Unary 响应的客户端线程,并更新连接最后活跃时间。

onErroronCompleted 在流异常断开或客户端主动关闭时清理gRPC流状态,不做业务清理,业务清理由 ConnectionManager 在检测到连接超时或 BaseRpcServerConnectionEventListener 触发 ClientDisconnectEvent 时完成。

// 处理客户端发起的双向流连接。
public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {

    // 从gRPC上下文中提取连接信息
    final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
    final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
    final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
    String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
    String clientIp = "";

    // 返回 StreamObserver,此后由gRPC框架回调onNext/onError/onCompleted
    return new StreamObserver<Payload>() {

        @Override
        public void onNext(Payload payload) {

            clientIp = payload.getMetadata().getClientIp();
            traceDetailIfNecessary(payload);

            // 解析Payload
            Object parseObj;
            parseObj = GrpcUtils.parse(payload);

            // ConnectionSetupRequest:建连握手
            if (parseObj instanceof ConnectionSetupRequest) {
                ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;

                // 提取客户端信息:应用名、租户、标签、能力表
                String appName = setUpRequest.getLabels() != null
                        ? setUpRequest.getLabels().getOrDefault(Constants.APPNAME, "-") : "-";

                // 创建连接元数据和GrpcConnection
                ConnectionMeta metaInfo = new ConnectionMeta(connectionId,
                        payload.getMetadata().getClientIp(), remoteIp, remotePort,
                        localPort, ConnectionType.GRPC.getType(),
                        setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                metaInfo.setTenant(setUpRequest.getTenant());

                GrpcConnection connection = new GrpcConnection(metaInfo, responseObserver,
                        GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
                if (setUpRequest.getAbilityTable() != null) {
                    connection.setAbilityTable(setUpRequest.getAbilityTable());
                }

                // 注册连接
                boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
                if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                    // 服务端未启动完成或连接数超限 —> 拒绝连接
                    connection.close();
                } else {
                    // 注册成功 → 响应SetupAckRequest,携带服务端能力表
                    connection.sendRequestNoAck(new SetupAckRequest(
                            NacosAbilityManagerHolder.getInstance()
                                    .getCurrentNodeAbilities(AbilityMode.SERVER)));
                }

            // Response:Unary请求的回执
            } else if (parseObj instanceof Response) {
                Response response = (Response) parseObj;
                RpcAckCallbackSynchronizer.ackNotify(connectionId, response);  // 唤醒等待线程
                connectionManager.refreshActiveTime(connectionId);              // 更新活跃时间
            }
        }

        @Override
        public void onError(Throwable t) {
            // 流异常断开,通知ClientCallStreamObserver清理
            if (responseObserver instanceof ServerCallStreamObserver) {
                ServerCallStreamObserver scso = (ServerCallStreamObserver) responseObserver;
                if (!scso.isCancelled()) {
                    scso.onCompleted();
                }
            }
        }

        @Override
        public void onCompleted() {
            // 客户端主动关闭流,清理
            if (responseObserver instanceof ServerCallStreamObserver) {
                ServerCallStreamObserver scso = (ServerCallStreamObserver) responseObserver;
                if (!scso.isCancelled()) {
                    scso.onCompleted();
                }
            }
        }
    };
}


四、服务端连接复用与销毁

4.1 服务端连接复用核心:ConnectionManager

服务端通过 ConnectionManager 管理所有gRPC长连接,核心是一个ConcurrentHashMap<String, Connection>connectionId 缓存。

建连时 ConnectionSetupRequest 握手触发 register(),经过幂等检查、限流判断后存入缓存,同时通过connectionForClientIp 统计单 IP 连接数,最后调用 notifyClientConnected() 通知naming模块创建业务Client。后续每次请求(Unary 查询或双向流推送)gRPC 框架自动携带 connectionIdgetConnection() 直接 HashMap 查找复用,refreshActiveTime() 续期连接。

断连时 unregister() 移除缓存、关闭连接、通知业务层清理实例和订阅数据。后台每3秒通过 NacosRuntimeConnectionEjector扫描所有连接,lastActiveTime 超时的连接通过 ConnectResetRequest 引导客户端重连。

整个生命周期中,connectionId 作为全局唯一标识贯穿建连、复用、销毁全部环节,保障了服务端对海量客户端连接的高效管理。

// 服务端连接管理器。
@Service
public class ConnectionManager {

    // 核心缓存:connectionId -> Connection(ConcurrentHashMap)
    Map<String, Connection> connections = new ConcurrentHashMap<>();

    // IP -> 连接数的计数映射,用于单IP限制
    Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<>();

    // 连接事件监听器注册表,通知naming/config模块
    private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;

    // 注册一个新连接
    public synchronized boolean register(String connectionId, Connection connection) {
        if (connections.containsKey(connectionId)) return true;   // 幂等
        if (checkLimit(connection)) return false;                 // 连接数超限
        connections.put(connectionId, connection);                // 存入缓存
        connectionForClientIp.computeIfAbsent(clientIp, k -> new AtomicInteger(0)).getAndIncrement();
        clientConnectionEventListenerRegistry.notifyClientConnected(connection);  // 触发业务回调
        return true;
    }

     // 注销一个连接
    public synchronized void unregister(String connectionId) {
        Connection remove = this.connections.remove(connectionId);
        if (remove != null) {
            AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
            atomicInteger.decrementAndGet();
            remove.close();
            clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
        }
    }

    // 连接查询与校验(每次请求时调用),按id查找连接(每秒数千次调用)
    public Connection getConnection(String connectionId) {
        return connections.get(connectionId);   // O(1) HashMap 查找
    }

    // 校验连接是否有效(GrpcRequestAcceptor 中调用)
    public boolean checkValid(String connectionId) {
        return connections.containsKey(connectionId);
    }

    // 更新连接活跃时间(每次有请求或回执时调用)
    public void refreshActiveTime(String connectionId) {
        Connection connection = connections.get(connectionId);
        if (connection != null) connection.freshActiveTime();
    }

    // 过期连接清理(定时任务,每3秒)
    @PostConstruct
    public void start() {
        initConnectionEjector();
        RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
            runtimeConnectionEjector.doEject();               // 踢出过期连接
            MetricsMonitor.getLongConnectionMonitor().set(connections.size());
        }, 1000L, 3000L, TimeUnit.MILLISECONDS);
    }
}


4.2 服务端连接关闭设计

服务端连接关闭覆盖三种场景:

  • 客户端双向流断开或 GrpcBiStreamRequestAcceptoronCompleted/onError触发 unregister(),从 connections 缓存中移除连接,递减该 IP 的连接计数(归零时移除 Key),关闭 gRPC 连接和双向流,最后通知业务层清理实例和订阅数据。
  • 定时清理过期连接时NacosRuntimeConnectionEjector 每3秒执行一次过期连接清理,遍历所有连接收集 lastActiveTime 空闲超过 20 秒的连接,通过 ClientDetectionRequest 探活,有响应则续期,无响应则等待全部探活完成后由 ConnectionManager 统一清理。
  • 负载过高或运维干预时loadSingle() 通过 ConnectResetRequest 向指定连接发送重连指令,携带目标节点地址引导客户端切换到其他节点。三种场景覆盖了服务端主动断开、被动探活清理和负载均衡调度,保障了连接生命周期管理的高效性和集群稳定性。
			————————————————————————————① 服务端主动断连 ————————————————————————————————————————
com.alibaba.nacos.core.remote.ConnectionManager

public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);   // 从缓存移除
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();          // 递减 IP 连接计数
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);           // 归零时移除
            }
        }
        remove.close();                                           // 关闭gRPC连接
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);  // 通知业务层
    }
}


			————————————————————————————② 定时过期连接清理(每3秒执行) ————————————————————————————————————————
com.alibaba.nacos.core.remote.NacosRuntimeConnectionEjector
        
private void ejectOutdatedConnection() {
    Map<String, Connection> connections = connectionManager.connections;
    Set<String> outDatedConnections = new HashSet<>();

    // 收集过期连接
    for (Entry<String, Connection> entry : connections.entrySet()) {
        Connection client = entry.getValue();
        long idleTime = now - client.getMetaInfo().getLastActiveTime();
        if (idleTime >= KEEP_ALIVE_TIME) {                           // 空闲超过 20 秒
            outDatedConnections.add(client.getMetaInfo().getConnectionId());
        } else if (client.getMetaInfo().pushQueueBlockTimesLastOver(300 * 1000)) {
            outDatedConnections.add(client.getMetaInfo().getConnectionId());
        }
    }

    // 发送探活请求,无响应则清理
    for (String connId : outDatedConnections) {
        Connection connection = connectionManager.getConnection(connId);
        if (connection != null) {
            connection.asyncRequest(new ClientDetectionRequest(), new RequestCallBack() {
                void onResponse(Response response) {
                    if (response != null && response.isSuccess()) {
                        connection.freshActiveTime();               // 有响应 → 续期
                    }
                }
                void onException(Throwable e) {                     // 无响应 -> 等待所有回调完成
                    latch.countDown();                              // 最后统一调用 unregister()
                }
            });
        }
    }
    // 等待所有探活完成,无响应的连接由ConnectionManager统一清理
}


			————————————————————————————③ 负载均衡连接踢出 ————————————————————————————————————————
com.alibaba.nacos.core.remote.ConnectionManager
        
// 当集群节点过载或运维手动调整连接分布时,通过loadSingle() 向指定连接发送 ConnectResetRequest,引导客户端重连到其他节点
public boolean loadSingle(String connectionId, String redirectAddress) {
    Connection connection = getConnection(connectionId);
    if (connection != null) {
        ConnectResetRequest resetRequest = new ConnectResetRequest();
        resetRequest.setServerIp(redirectIp);
        resetRequest.setServerPort(redirectPort);
        connection.request(resetRequest, 3000L);     // 发送重连指令
        // 客户端收到后 -> ConnectResetRequestHandler -> switchServerAsync()
    }
    return true;
}

三、全文小结

本章围绕Nacos 2.x gRPC双向流长连接,深入分析了客户端、服务端两大维度的连接创建、复用、销毁全生命周期运行机制,并对比分析配置中心与注册中心差异化的连接模型。在客户端层面,配置中心采用基于taskId分组的多连接隔离模型,以懒加载形式初始化RpcClient,不同监听分组独享独立gRPC连接,实现业务隔离;注册中心则采用全局单连接模型,所有服务注册、订阅、心跳、实例查询等请求统一复用唯一双向流通道,减少客户端TCP资源占用。两大模块建连入口虽然不同,但最终均依托RpcClient.start()完成客户端初始化,通过内置双线程分别处理连接事件回调、健康巡检与自动重连,再经由connectToServer()执行建连协议,完成TCP通道创建、双向流绑定、握手协商,最终构建完整通信通道。

同时客户端采用三层连接复用架构,分别从RpcClient实例、GrpcClient连接、TCP通道三个层面实现资源复用,在保障通信性能的前提下,规避频繁创建销毁连接带来的性能损耗,并适配应用主动关闭、健康检测失败、流异常断开、服务端引导重连四类场景,形成一套完善的断线重连与资源回收机制。

在服务端层面,Nacos基于单Netty端口双通道架构,同时兼容Unary一元调用与Bidi双向流式两种通信模式,各司其职分别处理客户端即时同步请求与服务端主动推送事件;依托ConnectionManager作为连接管控核心,以connectionId为唯一标识缓存全部客户端连接,统一实现连接注册、活跃度续期、过期探测、资源注销等功能。此外服务端设计了被动断连清理、定时过期剔除、过载主动踢出三种连接销毁策略,全方位适配日常运行、异常故障、集群负载均衡等不同生产场景。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐