一般来讲,实现一个gRPC服务端和客户端,主要分为这几步:
安装 protobuf 依赖
编写 proto 文件(IDL)
编译 proto 文件(生成stub文件)
编写 server 端,实现我们的接口
编写 client 端,测试我们的接口
1、第一个 gPRC 的开发 项目结构:
xxxx-api
模块
定义 protobuf IDL 语言
并且通过命令创建对应的代码
xxxx-server
模块
实现 api 模块中定义的服务接口
发布 gRPC 服务 (创建服务端程序)
xxxx-clien
模块
创建服务端 stub(代理)
基于代理(stub) RPC 调用。
目录结构
1 2 3 4 5 6 7 8 9 10 ├── grpc-api │ ├── pom.xml │ ├── src ├── grpc-client │ ├── pom.xml │ ├── src ├── grpc-server │ ├── pom.xml │ ├── src └── pom.xml
api 模块 编写 .proto
文件,书写 protobuf 的 IDL。 1 2 3 4 5 6 7 8 9 10 syntax = "proto3" ;option java_multiple_files = false ; option java_package = "com.lepeng.grpcserver.grpc" ; option java_outer_classname = "HelloProto" ; service HelloService { rpc hello(HelloRequest) returns (HelloResponse){} rpc hello1(HelloRequest1) returns (HelloResponse1){} }
把 IDL 转化为编程语言,有两种方式:
使用 protoc 命令把 proto 文件中的 IDL 转换成编程语言protoc --java_out=/xxx/xxx /xxx/xxx/xx.proto
使用 maven 插件,进行 protobuf IDL 文件的编译,并把他放置 IDEA 具体位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 <project > <dependencies > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-netty-shaded</artifactId > <version > 1.52.1</version > <scope > runtime</scope > </dependency > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-protobuf</artifactId > <version > 1.52.1</version > </dependency > <dependency > <groupId > io.grpc</groupId > <artifactId > grpc-stub</artifactId > <version > 1.52.1</version > </dependency > <dependency > <groupId > org.apache.tomcat</groupId > <artifactId > annotations-api</artifactId > <version > 6.0.53</version > <scope > provided</scope > </dependency > </dependencies > <build > <extensions > <extension > <groupId > kr.motd.maven</groupId > <artifactId > os-maven-plugin</artifactId > <version > 1.7.1</version > </extension > </extensions > <plugins > <plugin > <groupId > org.xolstice.maven.plugins</groupId > <artifactId > protobuf-maven-plugin</artifactId > <version > 0.6.1</version > <configuration > <protocArtifact > com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact > <pluginId > grpc-java</pluginId > <pluginArtifact > io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact > </configuration > <executions > <execution > <goals > <goal > compile</goal > <goal > compile-custom</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
在 idea 中分别点击 compile 和 compile-custom。compile 和 compile-custom 两个指令都需要执行。其中
compile 用来编译消息对象,生成消息体类文件
compile-custom 则依赖消息对象,生成接口服务,生成XXXGrpc类文件。
HelloProto
的默认生成目录为 target/generated-sources/protobuf/grpc
,放的是生成的 message 对应的 java 对象
HelloServiceGrpc
的默认生成目录为 target/generated-sources/protobuf/grpc-java
,放的是生成的 Service 对应的类
将转化的代码放到对应目录,即 com.lepeng.grpcserver
子目录 grpc 下
xxxx-server 服务端模块的开发 实现业务接口 添加具体的功能(MyBatis+MySQL)
开发步骤
构建监听地址 SocketAddress
SPI 加载 NettyServerProvider
根据指定端口创建监听地址
将 Service 注册到缓存
Server 构建
服务端启动
原理参考
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package com.lepeng.grpcserver.grpc.server;import com.lepeng.grpcserver.grpc.HelloServiceGrpc;import com.lepeng.grpcserver.grpc.HelloProto.HelloResponse;import com.lepeng.grpcserver.grpc.HelloProto.HelloRequest;import io.grpc.Server;import io.grpc.ServerBuilder;import io.grpc.stub.StreamObserver;import java.io.IOException;import java.util.logging.Logger;public class HelloServer { private static final Logger logger = Logger.getLogger(HelloServer.class .getName ()) ; private int port = 50051 ; private Server server; private void start () throws IOException { server = ServerBuilder.forPort(port) .addService(new HelloServiceImpl()) .build() .start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run () { System.err.println("*** shutting down gRPC server since JVM is shutting down" ); HelloServer.this .stop(); System.err.println("*** server shut down" ); } }); } private void stop () { if (server != null ) { server.shutdown(); } } private void blockUntilShutdown () throws InterruptedException { if (server != null ) { server.awaitTermination(); } } public static void main (String[] args) throws IOException, InterruptedException { final HelloServer server = new HelloServer(); server.start(); server.blockUntilShutdown(); } public class HelloServiceImpl extends HelloServiceGrpc .HelloServiceImplBase { @Override public void hello (HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) { String name = request.getName(); System.out.println("name parameter " +name); HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder(); builder.setResult("hello method invoke ok" ); HelloProto.HelloResponse helloResponse = builder.build(); responseObserver.onNext(helloResponse); responseObserver.onCompleted(); } } }
Netty 创建服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class GprcServer1 { public static void main(String[] args) throws IOException, InterruptedException { ServerBuilder serverBuilder = ServerBuilder.forPort(9000 ); serverBuilder.addService(new HelloServiceImpl()); Server server = serverBuilder.build(); server .start(); server .awaitTermination();; } }
xxx-client 模块 client 通过代理对象完成远端对象的调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.lepeng.grpcclient.grpc.client;import com.lepeng.grpcclient.grpc.HelloServiceGrpc;import com.lepeng.grpcclient.grpc.HelloProto.HelloResponse;import com.lepeng.grpcclient.grpc.HelloProto.HelloRequest;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.StatusRuntimeException;import java.util.concurrent.TimeUnit;import java.util.logging.Level;import java.util.logging.Logger;public class GprcClient1 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel); HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("this is name" ); HelloProto.HelloRequest helloRequest = builder.build(); HelloProto.HelloResponse helloResponse = helloService.hello(helloRequest); String result = helloResponse.getResult(); System.out.println("result = " + result); } catch (Exception e) { throw new RuntimeException(e); }finally { managedChannel.shutdown(); } } }
注意事项 1 2 3 4 5 6 7 服务端 处理返回值时 responseObserver.onNext(helloResponse1 ) ; responseObserver.onCompleted() ; requestObserver.onNext(helloRequest1 ) ; requestObserver.onCompleted() ;
执行 启动服务端,后台打印输出:Server started, listening on 50051
执行客户端,后台打印输出:
1 2 Will try to greet hans ... Greeting: Hello hans
gRpc 的四种通信方式 四种通信方式
简单 RPC,一元 RPC (Unary RPC)
服务端流式 RPC (Server Streaming RPC)
客户端流式 RPC (Client Streaming RPC)
双向流 RPC (Bi-directional Stream RPC)
简单RPC(一元RPC) 上面的 RPC 程序,实际上就是一元 RPC
特点 :当 client 发起调用后,提交数据,并且等待 服务端响应。开发过程中,主要采用就是 一元 RPC 的这种通信方式
protobuf
1 2 3 4 service HelloService { rpc hello(HelloRequest) returns (HelloResponse){} rpc hello1(HelloRequest1) returns (HelloResponse1){} }
服务端流式 RPC 一个请求对象,服务端可以回传多个结果对象。
特点
使用场景
1 2 3 4 client --------> Server 股票标号 <------- 某一个时刻的 股票的行情
protobuf
1 2 3 4 service HelloService { rpc hello(HelloRequest) returns (stream HelloResponse){} rpc hello1(HelloRequest1) returns (HelloResponse1){} }
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public void c2ss (HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) { String name = request.getName(); System.out.println("name = " + name); for (int i = 0 ; i < 9 ; i++) { HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder(); builder.setResult("处理的结果 " + i); HelloProto.HelloResponse helloResponse = builder.build(); responseObserver.onNext(helloResponse); try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException(e); } } responseObserver.onCompleted(); }public class GprcClient3 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel); HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("sunshuai" ); HelloProto.HelloRequest helloRequest = builder.build(); Iterator<HelloProto.HelloResponse> helloResponseIterator = helloService.c2ss(helloRequest); while (helloResponseIterator.hasNext()) { HelloProto.HelloResponse helloResponse = helloResponseIterator.next(); System.out.println("helloResponse.getResult() = " + helloResponse.getResult()); } } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }public class GrpcClient4 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel); HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("xiaohei" ); HelloProto.HelloRequest helloRequest = builder.build(); helloService.c2ss(helloRequest, new StreamObserver<HelloProto.HelloResponse>() { @Override public void onNext (HelloProto.HelloResponse value) { System.out.println("服务端每一次响应的信息 " + value.getResult()); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { System.out.println("服务端响应结束 后续可以根据需要 在这里统一处理服务端响应的所有内容" ); } }); managedChannel.awaitTermination(12 , TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
客户端流式 RPC 客户端发送多个请求对象,服务端只返回一个结果
应用场景 :IOT(物联网 【传感器】) 向服务端 发送数据
protobuf
1 rpc cs2s(stream HelloRequest) returns (HelloResponse){}
开发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 rpc cs2s (stream HelloRequest) returns (HelloResponse) {}public StreamObserver<HelloProto.HelloRequest> cs2s(StreamObserver<HelloProto.HelloResponse> responseObserver) { return new StreamObserver<HelloProto.HelloRequest>() { @Override public void onNext (HelloProto.HelloRequest value) { System.out.println("接受到了client发送一条消息 " + value.getName()); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { System.out.println("client的所有消息 都发送到了 服务端 ...." ); HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder(); builder.setResult("this is result" ); HelloProto.HelloResponse helloResponse = builder.build(); responseObserver.onNext(helloResponse); responseObserver.onCompleted(); } }; }public class GrpcClient5 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel); StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() { @Override public void onNext (HelloProto.HelloResponse value) { System.out.println("服务端 响应 数据内容为 " + value.getResult()); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { System.out.println("服务端响应结束 ... " ); } }); for (int i = 0 ; i < 10 ; i++) { HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("sunshuai " + i); HelloProto.HelloRequest helloRequest = builder.build(); helloRequestStreamObserver.onNext(helloRequest); Thread.sleep(1000 ); } helloRequestStreamObserver.onCompleted(); managedChannel.awaitTermination(12 , TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
双向流式 RPC 客户端可以发送多个请求消息,服务端响应多个响应消息
应用场景 :聊天室
编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 rpc cs2ss (stream HelloRequest) returns (stream HelloResponse) {}public StreamObserver<HelloProto.HelloRequest> cs2ss(StreamObserver<HelloProto.HelloResponse> responseObserver) { return new StreamObserver<HelloProto.HelloRequest>() { @Override public void onNext (HelloProto.HelloRequest value) { System.out.println("接受到client 提交的消息 " +value.getName()); responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("response " +value.getName()+" result " ).build()); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { System.out.println("接受到了所有的请求消息 ... " ); responseObserver.onCompleted(); } }; }public class GrpcClient6 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel); StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2ss(new StreamObserver<HelloProto.HelloResponse>() { @Override public void onNext (HelloProto.HelloResponse value) { System.out.println("响应的结果 " +value.getResult()); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { System.out.println("响应全部结束..." ); } }); for (int i = 0 ; i < 10 ; i++) { helloRequestStreamObserver.onNext(HelloProto.HelloRequest.newBuilder().setName("sunshuai " + i).build()); } helloRequestStreamObserver.onCompleted(); managedChannel.awaitTermination(12 , TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
gPRC 代理方式
BlockingStub:阻塞 通信方式
Stub:异步 通过监听处理的
FutureStub:同步 异步 NettyFuture
FutureStub 只能应用 一元RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class GrpcClient7 { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 9000 ).usePlaintext().build(); try { TestServiceGrpc.TestServiceFutureStub testServiceFutureStub = TestServiceGrpc.newFutureStub(managedChannel); ListenableFuture<TestProto.TestResponse> responseListenableFuture = testServiceFutureStub.testSuns(TestProto.TestRequest.newBuilder().setName("xiaojren" ).build()); Futures.addCallback(responseListenableFuture, new FutureCallback<TestProto.TestResponse>() { @Override public void onSuccess (TestProto.TestResponse result) { System.out.println("result.getResult() = " + result.getResult()); } @Override public void onFailure (Throwable t) { } }, Executors.newCachedThreadPool()); System.out.println("后续的操作...." ); managedChannel.awaitTermination(12 , TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } } }
gPRC 与 SpringBoot 整合 gRPC 和 SpringBoot 整合的思想
grpc-server
grpc-client
SpringBoot 与 GRPC 整合的过程中 对于服务端做了什么封装
搭建 SpringBoot 的开发环境
引入与 Grpc 相关的内容
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > com.suns</groupId > <artifactId > rpc-grpc-api</artifactId > <version > 1.0-SNAPSHOT</version > </dependency > <dependency > <groupId > net.devh</groupId > <artifactId > grpc-server-spring-boot-starter</artifactId > <version > 2.14.0.RELEASE</version > </dependency >
开发服务
1 2 3 4 5 6 7 8 9 10 11 12 @GrpcService public class HelloServiceImpl extends HelloServiceGrpc .HelloServiceImplBase { @Override public void hello (HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) { String name = request.getName(); System.out.println("name is " + name); responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("this is result" ).build()); responseObserver.onCompleted(); } }
1 2 3 4 5 6 7 8 9 10 11 12 // application.yml spring: application: name: boot-server main: web-application-type: none grpc: server: port: 9000
客户端 环境搭建
1 2 3 4 5 <dependency > <groupId>net.devh</groupId> <artifactId>grpc-client-spring-boot-starter</artifactId> <version>2.14.0.RELEASE</version> </dependency >
编码
1 2 3 4 5 6 7 8 9 10 11 1. yml grpc: client: grpc-server: address: 'static://127.0.0.1:9000' negotiation-type: plaintext 2. 注入stub @GrpcClient("grpc-server") private HelloServiceGrpc.HelloServiceBlockingStub stub;