04-gRPC go版

一般来讲,实现一个gRPC服务端和客户端,主要分为这几步:

  1. 安装 protobuf 依赖
  2. 编写 proto 文件(IDL)
  3. 编译 proto 文件(生成stub文件)
  4. 编写server端,实现我们的接口
  5. 编写client端,测试我们的接口

Proto 文件编写

下面简单写一个小demo,新建两个文件夹,分别作为客户端和服务端。

image-20230120174308269

proto 文件内容如下(可以当作模板记下来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 使用proto3语法,有2和3
syntax = "proto3";

// 和 go 相关的语法
// option go_package = "path;name";
// path表示生成的go文件的存放地址,会自动生成目录,.表示当前目录
// name表示生成的go文件的包名
// 生成的go文件处于哪个目录哪个包中
// 这里声称在当前目录,service包中
option go_package = ".;service";

// 我们需要定义一个服务,在服务中需要有一个方法,这个方法可以接收客户端参数,返回服务端响应
// 其实很容易可以看出,我们定义了一个service,称为SayHello,这个服务有一个rpc方法,名为SayHello
// 这个方法会发送一个HelloRequest返回一个HelloResponse
service SayHello {
rpc SayHello(HelloRequest) returns (HelloResponse) {}
}

// message关键字,可以理解为结构体
// 这个比较特别的是变量后的"赋值"(这里并不是赋值,而是定义这个变量在message中的位置)
message HelloRequest {
string requestName = 1;
// int64 age = 2;
}

message HelloResponse {
string responseMsg = 1;
}

接下来可以通过 protoc 生成对应语言的代码,打开 Terminal,进入 proto 目录,输入一些代码即可

两个命令(当作模板可以记下)

1
2
3
4
5
protoc --go_out=. hello.proto     // 如果想输出其他语言的文件,请使用对应的参数
protoc --go-grpc_out=. hello.proto

protoc -I internal/service/pb internal/service/pb/*.proto --go_out=.
protoc -I internal/service/pb internal/service/pb/*.proto --go-grpc_out=.

系统会根据 go_out 指定的目录再拼接 proto 文件中 go_package 指定的目录生成对应的包名

输入完可以发现在proto目录下生成了两个文件,我们使用时只需要重写或修改其中的我们定义的方法,加上业务逻辑即可

服务端编写

  • 创建 gRPC Server 对象,你可以理解为 Server 端的抽象对象

  • 将 server (其包含需要被调用的服务端接口) 注册到 gRPC Server 的内部注册中心

    这样可以在接受到请求时,通过内部的服务发现,发现该服务端接口并转接进行逻辑处理

  • 创建 Listen,监听 TCP 端口

  • gRPC Server 开始 lis.Accept,直到 Stop

下面给出以之前写的 Hello 服务为例,实现以下服务端的编写

1
2
3
4
5
6
7
8
9
// 实现生成代码中未实现的服务
// hello Server
type server struct {
pb.UnimplementedSayHelloServer
}

func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{ResponseMsg: "hello" + req.RequestName}, nil
}

注册并开启 rpc 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
// 开启端口
listen, err := net.Listen("tcp", ":9090")
if err != nil {
panic("port open failed")
}
// 创建grpc服务
grpcServer := grpc.NewServer()
// 在grpc服务端中注册我们自己编写的服务
pb.RegisterSayHelloServer(grpcServer, &server{})
// 启动服务
err = grpcServer.Serve(listen)
if err != nil {
panic("service open failed")
}
}

客户端编写

  • 创建与给定目标(服务端)的连接交互
  • 创建 server 的服务端对象
  • 发送 RPC 请求,请求同步响应,得到回调后返回响应结果
  • 输出响应结果

同样以刚刚的 Hello 服务为例,编写一下客户端侧的代码 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
// 连接服务
// 第二个参数为安全配置(这里新建了空的加密配置,即不进行安全加密)
conn, err := grpc.Dial("127.0.0.1:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic("fail to connect server")
}
// 关闭grpc连接,都记得关闭!
defer conn.Close()
// 与服务端建立连接
client := pb.NewSayHelloClient(conn)
resp, _ := client.SayHello(context.Background(),
&pb.HelloRequest{
RequestName: "二火",
})
fmt.Println(resp.GetResponseMsg())
}

我们分别启动服务端和客户端,可以看见:

image-20230120203203783

认证-安全传输

介绍

gRPC 是一个典型的 C/S 模型,需要开发客户端和服务端,客户端与服务端需要达成协议,使用某一个确认的传输协议来传输数据,gRPC通常默认是使用 protobuf 来作为传输协议,当然也是可以使用其他自定义的。

image-20230120203523973

那么,客户端与服务端要通信之前,客户端如何知道自己的数据是发给哪一个明确的服务端的呢?反过来,服务端是否也需要有一种方式来弄清楚自己的数据要返回给谁呢?

那么就不得不提到 gRPC 的认证

此处说到的认证,不是用户的身份认证,而是指多个 server 和多个 client 之间,如何识别对方是谁,并且可以安全地进行数据传输

  • SSL / TLS 认证 (采用http2协议)
  • 基于Token的认证方式 (基于安全链接)
  • 不采用任何措施的连接,这是不安全的连接 (默认采用http1)
  • 自定义的身份认证

客户端和服务端之间调用,我们可以通过加入证书的方式,实现调用的安全性

TLS (Transport Layer Security , 安全传输层),TLS是建立在传输层TCP协议之上的协议,服务于应用层,它的前身是 SSL (Secure Socket Layer, 安全套接字层),它实现了将应用层的报文进行加密后再交由TCP进行传输的功能。

TLS协议主要解决如下三个网络安全问题。

  • 保密(message privacy),保密通过加密encryption实现,所有信息都加密传输,第三方无法嗅探
  • 完整性(message integrity),通过MAC校验机制,一旦被篡改,通信双方会立刻发现
  • 认证(mutual authentication),双方认证,双方都可以配备证书,防止身份被冒充

生产环境可以购买证书或使用一些平台发放的免费证书

下面解释一些有关证书的概念:

  • key: 服务器上的私钥文件,用于对发送给客户端数据的加密,以及对客户端接受到数据的解密。
  • csr: 证书签名请求文件,用于提交给证书颁发机构(CA),
  • crt: 由证书办法机构(CA)签名后的证书,或是开发者自签名的证书,包含证书持有人的信息,持有者的公钥,以及签署者的签名等信息。
  • pem: 是基于Base64编码的证书格式,扩展名包括pem、crt、cer

SSL / TLS 认证

首先通过openSSL生成证书和私钥

  • openssl官网下载

    其他人做的便捷版安装包

  • 我们使用便捷版安装包,一直下一步即可

  • 将bin目录配置到环境变量

  • 命令行测试 openssl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1、生成私钥
openssl genrsa -out server.key 2048

# 2、生成证书 全部回车、可以不填
openssl req -new -x509 -key server.key -out server.crt -days 36500
# 国家名称
Country Name (2 letter code) [AU]:CN
# 省名称
State or Province Name (full name) [Some-State]:Fujian
# 城市名称
Locality Name (eg, city) []:Fuzhou
# 公司组织名称
Organization Name (eg, company) [Internet widgits Pty Ltd]: ByteDance
# 部门名称
Organizational Unit Name (eg, section) []:go
# 服务器or网站名称
Organizational Unit Name (e.g. server FQDN or YOUR name) []:dousheng
# 邮件
Email Address []:362664609@qq.com

# 3、生成csr
openssl req -new -key server.key -out server.csr
1
2
3
4
5
6
7
#更改openssl.cnf (Linux是openssl.cfg)
#1)从openssl/bin下将openssl.cnf复制到项目目录中
#2)找到 [ CA_default ], 打开 copy_extensions = copy (解除注释即可)
#3)找到 [ req ], 打开 req_extensions = v3_req # The extensions to add to a certificate request
#4)找到[ v3_req ], 添加 subjectAltName = @alt_names
#5)添加新的标签 [ alt_names ], 和标签字段
DNS.1 = *.skydog.ltd
1
2
3
4
5
6
7
8
9
# 生成证书私钥 test.key
openssl genpkey -algorithm RSA -out test.key

# 通过私钥test.key生成证书请求文件test.csr(注意cfg和cnf)
openssl req -new -nodes -key test.key -out test.csr -days 3650 -subj "/C=cn/OU=myorg/0=mycomp/CN=myname" -config ./openssl.cnf -extensions v3_req
#test.csr是上面生成的证书请求文件。ca.crt/server.key是CA证书文件和key,用来对test.csr进行签名认证。这两个文件在第一部分生成。

# 生成SAN证书 (pem)
openssl x509 -req -days 365 -in test.csr -out test.pem -CA server.crt -CAkey server.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req

代码中添加认证

服务端
1
2
3
4
5
6
// TSL认证
// 两个参数分别是 cretFile, keyFile (自签名文件, 私钥文件)
creds, _ := credentials.NewServerTLSFromFile("F:\\Project\\GoProjects\\grpc-study\\key\\test.pem",
"F:\\Project\\GoProjects\\grpc-study\\key\\test.key")
// 并在创建gRPC服务时,加入认证
grpcServer := grpc.NewServer(grpc.Creds(creds))
客户端
1
2
3
4
creds, _ := credentials.NewClientTLSFromFile("F:\\Project\\GoProjects\\grpc-study\\key\\test.pem",
"*.skydog.ltd")
// 并在连接服务时请求认证
conn, err := grpc.Dial("127.0.0.1:9090", grpc.WithTransportCredentials(creds))

Token认证

我们先看一个gRPC提供给我们的一个接口,这个接口中有两个方法,接口位于credentials包下,这个接口需要客户端来实现

1
2
3
4
5
6
type PerPRCCredentials interface {
// 获取元数据信息,也就是客户端提供的kv键值对,context用于控制超时与取消,uri是请求入口的uri
GetRequestMetadata(ctx context.Context, uri ...string) (map[string] string, error)
// 是否需要基于TLS认证进行安全传输,进入过返回值是true,则必须要上TLS验证,返回值是false则不用
RequireTransportSecurity() bool
}

客户端代码

定义并实现自己的Token解析类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type PerPRCCredentials interface {
// 获取元数据信息,也就是客户端提供的kv键值对,context用于控制超时与取消,uri是请求入口的uri
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// 是否需要基于TLS认证进行安全传输,进入过返回值是true,则必须要上TLS验证,返回值是false则不用
RequireTransportSecurity() bool
}

type ClientTokenAuth struct {
}

func (c ClientTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appId": "SkyDog",
"appKey": "114514",
}, nil
}
func (c ClientTokenAuth) RequireTransportSecurity() bool {
return false
}

在客户端中添加自定义安全配置

1
2
3
4
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts, grpc.WithPerRPCCredentials(new(ClientTokenAuth)))
conn, err := grpc.Dial("127.0.0.1:9090", opts...)

服务端

在服务端侧做好token校验工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 在业务中获取元数据信息
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errors.New("token not found")
}
var appId string
var appKey string
if v, ok := md["appId"]; ok {
appId = v[0]
}
if v, ok := md["appKey"]; ok {
appKey = v[0]
}
// 用户 对应-> appId
if appId != "SkyDog" || appKey != "114514" {
return nil, errors.New("token invalid")
}

stream流

在 HTTP/1.1 的时代,同一个时刻只能对一个请求进行处理或者响应,换句话说,下一个请求必须要等当前请求处理完才能继续进行。

HTTP/1.1需要注意的是,在服务端没有response的时候,客户端是可以发起多个request的,但服务端依旧是顺序对请求进行处理, 并按照收到请求的次序予以返回。

HTTP/2 的时代,多路复用的特性让一次同时处理多个请求成为了现实,并且同一个 TCP 通道中的请求不分先后、不会阻塞,HTTP/2 中引入了流(Stream) 和 帧(Frame) 的概念,当 TCP 通道建立以后,后续的所有操作都是以流的方式发送的,而二进制帧则是组成流的最小单位,属于协议层上的流式传输。

HTTP/2 在一个 TCP 连接的基础上虚拟出多个 Stream, Stream 之间可以并发的请求和处理, 并且 HTTP/2 以二进制帧 (frame) 的方式进行数据传送, 并引入了头部压缩 (HPACK), 大大提升了交互效率

定义

1
2
3
4
5
6
7
8
9
10
11
// 普通 RPC
rpc SayHello (HelloRequest) returns (HelloResponse) {}

// 客户端流式 RPC
rpc ClientStream (stream HelloRequest) returns (HelloResponse) {}

// 服务器端流式 RPC
rpc ServerStream (HelloRequest) returns (stream HelloResponse) {}

// 双向流式 RPC
rpc BothStream (stream HelloRequest) returns (stream HelloResponse) {}

stream关键字,当该关键字修饰参数时,表示这是一个客户端流式的 gRPC 接口;当该参数修饰返回值时,表示这是一个服务器端流式的 gRPC 接口;当该关键字同时修饰参数和返回值时,表示这是一个双向流式的 gRPC 接口。

客户端流

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"log"
"time"
)

func main() {
conn, err := grpc.Dial("127.0.0.1:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal("连接失败:", err)
return
}
defer conn.Close()

// 建立连接
client := pb.NewSayHelloClient(conn)
// 执行rpc调用
clientStream, err := client.ClientStream(context.Background())
if err != nil {
log.Fatal("调用失败", err)
return
}
helloch := make(chan struct{}, 1)
go helloRequest(clientStream, helloch)
select {
case <-helloch:
resp, err := clientStream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Println("客户端收到响应:", resp.ResponsonMsg)
}
}

func helloRequest(stream pb.SayHello_ClientStreamClient, rsp chan struct{}) {
count := 0
for {
err := stream.Send(&pb.HelloRequest{RequestName: "zhangsan"})
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
count++
if count > 10 {
rsp <- struct{}{}
break
}
}
}

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"io"
"log"
"net"
)

type server struct {
pb.UnimplementedSayHelloServer
}

func (s server) ClientStream(stream pb.SayHello_ClientStreamServer) error {
count := 0
for {
//源源不断的去接收客户端发来的信息
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
fmt.Println("服务端接收到的流", req.RequestName, count)
count++
if count > 10 {
resp := &pb.HelloResponse{ResponsonMsg: req.RequestName}
err := stream.SendAndClose(resp)
if err != nil {
return err
}
return nil
}
}
}

func main() {
listen, _ := net.Listen("tcp", ":9090")
// 创建grpc服务
grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
// 注册服务
pb.RegisterSayHelloServer(grpcServer, &server{})
// 启动服务
err := grpcServer.Serve(listen)
if err != nil {
log.Fatal("服务启动失败:", err)
return
}
}

服务端流

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"io"
"log"
)

func main() {
conn, err := grpc.Dial("127.0.0.1:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal("连接失败:", err)
return
}
defer conn.Close()

// 建立连接
client := pb.NewSayHelloClient(conn)
// 执行rpc调用
serverStream, err := client.ServerStream(context.Background(), &pb.HelloRequest{RequestName: "zhangsan"})
if err != nil {
log.Fatal("获取流出错", err)
}
for {
resp, err := serverStream.Recv()
if err != nil {
if err == io.EOF {
fmt.Println("客户端数据接收完成")
err := serverStream.CloseSend()
if err != nil {
log.Fatal(err)
}
break
}
log.Fatal(err)
}
fmt.Println("客户端收到的流", resp.ResponsonMsg)
}
}

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"log"
"net"
"time"
)

type server struct {
pb.UnimplementedSayHelloServer
}

func (s server) ServerStream(req *pb.HelloRequest, stream pb.SayHello_ServerStreamServer) error {
count := 0
for {
resp := &pb.HelloResponse{ResponsonMsg: req.RequestName}
err := stream.Send(resp)
if err != nil {
return err
}
time.Sleep(time.Second)
count++
if count > 10 {
return nil
}
}
}

func main() {
listen, _ := net.Listen("tcp", ":9090")
// 创建grpc服务
grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
// 注册服务
pb.RegisterSayHelloServer(grpcServer, &server{})
// 启动服务
err := grpcServer.Serve(listen)
if err != nil {
log.Fatal("服务启动失败:", err)
return
}
}

双向流

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"log"
"time"
)

func main() {
conn, err := grpc.Dial("127.0.0.1:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal("连接失败:", err)
return
}
defer conn.Close()

// 建立连接
client := pb.NewSayHelloClient(conn)
// 执行rpc调用
bothStream, err := client.BothStream(context.Background())
if err != nil {
log.Fatal("获取流出错", err)
}
for {
err = bothStream.Send(&pb.HelloRequest{RequestName: "zhangsan"})
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
resp, err := bothStream.Recv()
if err != nil {
log.Fatal(err)
}
fmt.Println("客户端收到的流信息", resp.ResponsonMsg)
}
}

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc-demo/proto"
"log"
"net"
"time"
)

type server struct {
pb.UnimplementedSayHelloServer
}

func (s server) BothStream(stream pb.SayHello_BothStreamServer) error {
for {
req, err := stream.Recv()
if err != nil {
return nil
}
fmt.Println("服务端收到客户端的消息", req.RequestName)
time.Sleep(time.Second)
resp := &pb.HelloResponse{ResponsonMsg: req.RequestName}
err = stream.Send(resp)
if err != nil {
return nil
}
}
}

func main() {
listen, _ := net.Listen("tcp", ":9090")
// 创建grpc服务
grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
// 注册服务
pb.RegisterSayHelloServer(grpcServer, &server{})
// 启动服务
err := grpcServer.Serve(listen)
if err != nil {
log.Fatal("服务启动失败:", err)
return
}
}

Reference


04-gRPC go版
https://flepeng.github.io/046-gRPC-04-gRPC-go版/
作者
Lepeng
发布于
2024年4月1日
许可协议