Nacos源码 (6) Grpc概述与Nacos集成
Nacos 2.x版本增加了GRPC服务接口和客户端,极大的提升了Nacos的性能,本文将简单介绍grpc-java的使用方式以及Nacos中集成GRPC的方式。
grpc-java
GRPC是google开源的、以protobuf作为序列化方式、以http2作为通信协议的高性能rpc框架。
grpc-java是grpc对java语言的实现,使用Netty/Okhttp作为通信组件。
使用方式
添加依赖
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.56.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
生成代码
需要将.proto文件放到src/main/proto或src/test/proto目录下。
然后添加生成代码使用的插件:
For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin’s IDE documentation):
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<skip>false</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
生成代码:
mvn clean compile
High-level组件
At a high level there are three distinct layers to the library: Stub, Channel, and Transport.
Stub
The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.
Channel
The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.
Transport
The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.
gRPC comes with multiple Transport implementations:
- The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
- The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
- The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
- The Binder transport is for Android cross-process communication on a single device.
四种通信模式
-
简单rpc - 一个请求一个响应
rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
-
服务端流式rpc - 服务端流式响应
rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
-
客户端流式rpc - 客户端流式请求
rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
-
双向流rpc
rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
高级应用
- 拦截器
- Stream Tracer - 流拦截器
- Retry Policy - 客户端重试
- NameResolver - 服务发现
- 负载均衡
- grpc与微服务:与dubbo、gateway、jwt、nacos2.x、openfeign
基础示例
本小节将使用简单的示例说明grpc-java的使用方法。
.proto文件
.proto文件需要放在src/main/proto目录下面:
syntax = "proto3";
package org.net5ijy.grpc.auto;
option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;
service StudentService {
rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}
message StudentUsernameRequest {
string username = 1;
}
message StudentResponse {
string realName = 1;
}
message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}
pom依赖
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.56.0</version>
</dependency>
pom插件
<build>
<finalName>${project.artifactId}</finalName>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<!-- mvn protobuf:compile -->
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
</pluginArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<skip>false</skip>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
编译生成代码
mvn clean compile
编写业务实现类
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public void getRealNameByUsername(StudentUsernameRequest request,
StreamObserver<StudentResponse> responseObserver) {
String username = request.getUsername();
System.out.printf("username=%s\n", username);
StudentResponse response = StudentResponse.newBuilder().setRealName("徐国峰").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void getRealNameByUsernameLike(StudentUsernameRequest request,
StreamObserver<StudentResponse> responseObserver) {
String username = request.getUsername();
System.out.printf("username=%s\n", username);
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰1").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰2").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰3").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰4").build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentUsernameRequest>() {
@Override
public void onNext(StudentUsernameRequest request) {
System.out.printf("username=%s\n", request.getUsername());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐国峰5").build();
StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐国峰6").build();
StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐国峰7").build();
StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐国峰8").build();
StudentResponseList responseList = StudentResponseList.newBuilder()
.addStudentResponse(response1)
.addStudentResponse(response2)
.addStudentResponse(response3)
.addStudentResponse(response4)
.build();
responseObserver.onNext(responseList);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
StreamObserver<StudentResponse> responseObserver) {
return new StreamObserver<StudentUsernameRequest>() {
@Override
public void onNext(StudentUsernameRequest request) {
System.out.printf("username=%s\n", request.getUsername());
StudentResponse response = StudentResponse.newBuilder()
.setRealName("徐国峰" + new Random().nextInt(10)).build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
Server代码
public class StudentGrpcServer {
private static final AtomicInteger COUNT = new AtomicInteger(0);
static final int GRPC_SERVER_PORT = 50051;
private Server server;
private void start() throws IOException {
// grpc server executor
Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> {
Thread t = new Thread(r);
t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
return t;
});
/* The port on which the server should run */
this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addService(new StudentServiceImpl())
.intercept(serverInterceptor())
.addTransportFilter(serverTransportFilter())
.build();
this.server.start();
System.out.println("Server started, listening on " + GRPC_SERVER_PORT);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
StudentGrpcServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("*** server shut down");
}));
}
private void stop() throws InterruptedException {
if (this.server != null && !this.server.isShutdown()) {
this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (this.server != null) {
this.server.awaitTermination();
}
}
private ServerInterceptor serverInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
Context ctx = Context.current();
return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
}
};
}
private ServerTransportFilter serverTransportFilter() {
return new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
return super.transportReady(transportAttrs);
}
@Override
public void transportTerminated(Attributes transportAttrs) {
super.transportTerminated(transportAttrs);
}
};
}
public static void main(String[] args) throws IOException, InterruptedException {
final StudentGrpcServer server = new StudentGrpcServer();
server.start();
server.blockUntilShutdown();
}
}
Client代码
public class StudentGrpcClient {
private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
private final StudentServiceGrpc.StudentServiceStub stub;
public StudentGrpcClient(Channel channel) {
// 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
// shut it down.
// Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
this.stub = StudentServiceGrpc.newStub(channel);
}
public void getRealNameByUsername(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StudentResponse response = this.blockingStub.getRealNameByUsername(request);
System.out.printf("Real name=%s\n", response.getRealName());
} catch (StatusRuntimeException e) {
System.err.println(e.getMessage());
}
}
public void getRealNameByUsernameLike(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
} catch (StatusRuntimeException e) {
System.err.println(e.getMessage());
}
}
public void getRealNameByUsernames(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
.getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList responseList) {
responseList.getStudentResponseList()
.forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("getRealNameByUsernames completed");
}
});
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onCompleted();
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
}
public void getRealNamesByUsernames(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
.getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
@Override
public void onNext(StudentResponse response) {
System.out.printf("Real name=%s\n", response.getRealName());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("getRealNameByUsernames completed");
}
});
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onCompleted();
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();
try {
StudentGrpcClient client = new StudentGrpcClient(channel);
int count = 1;
for (int i = 0; i < count; i++) {
client.getRealNameByUsername("admin2018");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernameLike("admin2019");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernames("admin2020");
Thread.sleep(20);
System.out.println("---");
client.getRealNamesByUsernames("admin2021");
}
Thread.sleep(10000);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
Client代码(FutureStub)
仅适用于单请求单响应的简单rpc调用:
try {
ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
StudentUsernameRequest.newBuilder().setUsername(username).build());
// 阻塞等待
// StudentResponse studentResponse = future.get();
CountDownLatch latch = new CountDownLatch(1);
Futures.addCallback(future, new FutureCallback<StudentResponse>() {
@Override
public void onSuccess(StudentResponse response) {
System.out.printf("Real name=%s\n", response.getRealName());
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
System.err.println(t.getMessage());
latch.countDown();
}
}, Executors.newSingleThreadExecutor());
latch.await();
} catch (StatusRuntimeException | InterruptedException e) {
System.err.println(e.getMessage());
}
Nacos中grpc的使用
在Nacos中,proto文件并没有定义所有的接口,而是只定义了基础的转发接口和通用请求响应Payload结构体。
具体的接口请求响应结构体在业务代码中编写,业务接口则是使用转发接口进行路由,类似SpringMVC中DispatcherServlet转发请求给Controller一样。
本小节将简单介绍Nacos中集成grpc的方式。
服务端
BaseGrpcServer
抽象类BaseRpcServer定义了rpc服务器的框架逻辑,模板方法startServer()要子类实现,是启动rpc服务器的核心逻辑。
抽象类BaseGrpcServer继承了BaseRpcServer类,封装了grpc组件:
- Server - GRPC服务器对象
- GrpcRequestAcceptor - 业务请求接收、转发器
- GrpcBiStreamRequestAcceptor - 连接请求接收处理器,用于获取双向流发送StreamObserver
- ConnectionManager - 连接管理器
startServer()方法封装了启动grpc服务器的逻辑:
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
// 把connectionId、ip、port等保存到Context上
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call);
// 保存channel
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 添加转发组件
addServices(handlerRegistry, serverInterceptor);
// 创建Server
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
// 在连接建立时获取ip、port等信息,生成connectionId
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID,
System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
// 连接断开时,从connectionManager移除
connectionManager.unregister(connectionId);
}
}
}).build();
server.start();
}
GrpcBiStreamRequestAcceptor
grpc bi stream request handler.
主要功能就是封装服务端Connection对象:
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
Map<String, String> labels = setUpRequest.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
// 封装连接基础信息和responseObserver、channel
// 使用responseObserver向客户端推送消息
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
// 注册到connectionManager
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
try {
connection.request(new ConnectResetRequest(), 3000L);
connection.close();
} catch (Exception e) {
}
}
}
GrpcRequestAcceptor
处理业务请求,将业务请求转发到RequestHandler上:
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
String type = grpcRequest.getMetadata().getType();
// 查找RequestHandler处理器对象
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
// no handler found
if (requestHandler == null) {
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 请求体反序列化
Object parseObj = null;
try {
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
Payload payloadResponse = GrpcUtils.convert(
buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 处理业务请求
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新客户端活跃状态
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 使用RequestHandler处理请求
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
// 响应
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Payload payloadResponse = GrpcUtils.convert(
buildErrorResponse((e instanceof NacosException) ?
((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
}
客户端
客户端使用GrpcSdkClient类,连接服务端的逻辑在其父类GrpcClient的connectToServer方法中:
public Connection connectToServer(ServerInfo serverInfo) {
try {
if (grpcExecutor == null) {
int threadNumber = ThreadUtils.getSuitableThreadCount(8);
grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
"nacos-grpc-client-executor-%d").build());
grpcExecutor.allowCoreThreadTimeOut(true);
}
int port = serverInfo.getServerPort() + rpcPortOffset();
// 创建grpc客户端stub
RequestGrpc.RequestFutureStub newChannelStubTemp =
createNewChannelStub(serverInfo.getServerIp(), port);
if (newChannelStubTemp != null) {
// 检查服务端的可用性
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
return null;
}
// 创建biStreamStub
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
// 创建connection
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
// create stream request and bind connection event to this connection.
// 用于向服务端发送请求
StreamObserver<Payload> payloadStreamObserver =
bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// 发送一个ConnectionSetupRequest让服务端创建Connection
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
return grpcConn;
}
return null;
} catch (Exception e) {}
return null;
}
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 使用客户端侧的ServerRequestHandler处理服务端发送过来的数据
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
// 响应
sendResponse(response);
}
} catch (Exception e) {
sendResponse(request.getRequestId(), false);
}
}
} catch (Exception e) {
// ...
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
// ...
}
}
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
});
}
更多推荐
所有评论(0)