Featured image of post GRPC

GRPC

GRPC的笔记

grpc

初识protobuffer

1
go get -u github.com/golang/protobuf/protoc-gen-go

第一个proto

proto文件都需要以.proto作为后缀。

下面就是一个简单的protobuffer,name = 1不是指他的值是1,而是指他在Person中的序号是1.

option go_package = “path;package”

path代表生成的go文件在当前目录,package代表包为first。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
syntax = "proto3";
option go_package="./;first";

package first;

message Person{
  string name = 1;
  int32 age = 2;
  string email = 3;
  enum status{
    CHILD=0;
    ADULT=1;
    OLD = 2;
  }
}

生成go文件

protoc –go_out= . ./*.proto

生成的pb.go文件

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// 	protoc-gen-go v1.28.0
// 	protoc        v3.20.0
// source: person.proto

package first

import (
	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
	reflect "reflect"
	sync "sync"
)

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type PersonStatus int32

const (
	Person_CHILD PersonStatus = 0
	Person_ADULT PersonStatus = 1
	Person_OLD   PersonStatus = 2
)

// Enum value maps for PersonStatus.
var (
	PersonStatus_name = map[int32]string{
		0: "CHILD",
		1: "ADULT",
		2: "OLD",
	}
	PersonStatus_value = map[string]int32{
		"CHILD": 0,
		"ADULT": 1,
		"OLD":   2,
	}
)

func (x PersonStatus) Enum() *PersonStatus {
	p := new(PersonStatus)
	*p = x
	return p
}

func (x PersonStatus) String() string {
	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}

func (PersonStatus) Descriptor() protoreflect.EnumDescriptor {
	return file_person_proto_enumTypes[0].Descriptor()
}

func (PersonStatus) Type() protoreflect.EnumType {
	return &file_person_proto_enumTypes[0]
}

func (x PersonStatus) Number() protoreflect.EnumNumber {
	return protoreflect.EnumNumber(x)
}

// Deprecated: Use PersonStatus.Descriptor instead.
func (PersonStatus) EnumDescriptor() ([]byte, []int) {
	return file_person_proto_rawDescGZIP(), []int{0, 0}
}

type Person struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	Name  string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Age   int32  `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"`
	Email string `protobuf:"bytes,3,opt,name=email,proto3" json:"email,omitempty"`
}

func (x *Person) Reset() {
	*x = Person{}
	if protoimpl.UnsafeEnabled {
		mi := &file_person_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *Person) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*Person) ProtoMessage() {}

func (x *Person) ProtoReflect() protoreflect.Message {
	mi := &file_person_proto_msgTypes[0]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use Person.ProtoReflect.Descriptor instead.
func (*Person) Descriptor() ([]byte, []int) {
	return file_person_proto_rawDescGZIP(), []int{0}
}

func (x *Person) GetName() string {
	if x != nil {
		return x.Name
	}
	return ""
}

func (x *Person) GetAge() int32 {
	if x != nil {
		return x.Age
	}
	return 0
}

func (x *Person) GetEmail() string {
	if x != nil {
		return x.Email
	}
	return ""
}

var File_person_proto protoreflect.FileDescriptor

var file_person_proto_rawDesc = []byte{
	0x0a, 0x0c, 0x70, 0x65, 0x72, 0x73, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05,
	0x66, 0x69, 0x72, 0x73, 0x74, 0x22, 0x6d, 0x0a, 0x06, 0x50, 0x65, 0x72, 0x73, 0x6f, 0x6e, 0x12,
	0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
	0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05,
	0x52, 0x03, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03,
	0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x22, 0x27, 0x0a, 0x06, 0x73,
	0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x09, 0x0a, 0x05, 0x43, 0x48, 0x49, 0x4c, 0x44, 0x10, 0x00,
	0x12, 0x09, 0x0a, 0x05, 0x41, 0x44, 0x55, 0x4c, 0x54, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x4f,
	0x4c, 0x44, 0x10, 0x02, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x3b, 0x66, 0x69, 0x72, 0x73, 0x74,
	0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
	file_person_proto_rawDescOnce sync.Once
	file_person_proto_rawDescData = file_person_proto_rawDesc
)

func file_person_proto_rawDescGZIP() []byte {
	file_person_proto_rawDescOnce.Do(func() {
		file_person_proto_rawDescData = protoimpl.X.CompressGZIP(file_person_proto_rawDescData)
	})
	return file_person_proto_rawDescData
}

var file_person_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_person_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_person_proto_goTypes = []interface{}{
	(PersonStatus)(0), // 0: first.Person.status
	(*Person)(nil),    // 1: first.Person
}
var file_person_proto_depIdxs = []int32{
	0, // [0:0] is the sub-list for method output_type
	0, // [0:0] is the sub-list for method input_type
	0, // [0:0] is the sub-list for extension type_name
	0, // [0:0] is the sub-list for extension extendee
	0, // [0:0] is the sub-list for field type_name
}

func init() { file_person_proto_init() }
func file_person_proto_init() {
	if File_person_proto != nil {
		return
	}
	if !protoimpl.UnsafeEnabled {
		file_person_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
			switch v := v.(*Person); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
	}
	type x struct{}
	out := protoimpl.TypeBuilder{
		File: protoimpl.DescBuilder{
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
			RawDescriptor: file_person_proto_rawDesc,
			NumEnums:      1,
			NumMessages:   1,
			NumExtensions: 0,
			NumServices:   0,
		},
		GoTypes:           file_person_proto_goTypes,
		DependencyIndexes: file_person_proto_depIdxs,
		EnumInfos:         file_person_proto_enumTypes,
		MessageInfos:      file_person_proto_msgTypes,
	}.Build()
	File_person_proto = out.File
	file_person_proto_rawDesc = nil
	file_person_proto_goTypes = nil
	file_person_proto_depIdxs = nil
}

序列化和反序列化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
	"fmt"
	"github.com/golang/protobuf/proto"
)

func main() {
	p := &Person{Name: "phm",Age: 12,Email: "8888888@qq.com"}
	//序列化
    marshal, _ := proto.Marshal(p)
	fmt.Println(marshal)
	newP := &Person{}
	//反序列化
	_ = proto.Unmarshal(marshal, newP)
	fmt.Println(newP)
}

proto和json的相互转换

首先安装一个包

1
go get google.golang.org/protobuf/encoding/protojson
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"fmt"
	"google.golang.org/protobuf/encoding/protojson"
)

func main() {
	p := &Person{
		Name:  "phm",
		Age:   13,
		Email: "999999@qq.com",
	}
	format := protojson.Format(p.ProtoReflect().Interface())
	fmt.Println(format)
	message := p.ProtoReflect().Interface()
	_ = protojson.Unmarshal([]byte(format), message)
	fmt.Println(message)
}

在protobuffer中定义服务

如果你想在RPC(远程过程调用)系统中使用你的消息类型,你可以在一个.proto文件中定义一个RPC服务接口,并且Protocol Buffers编译器将以你选择的语言生成服务接口代码和存根。因此,例如,如果你想定义一个RPC服务,它的方法接受你的SeanchRequest并返回一个SeanchResponse,你可以在你的.proto文件中定义它,如下所示:

1
2
3
4
service RpcService{
  rpc Search(Person) returns (Person);
  rpc register(Person) returns (Person);
}

初识rpc

在上述本地过程调用的例子中,我们是在一台计算机上执行了计算机上的程序,完成调用。随着计算机技术的发展和需求场景的变化,有时就需要从一台计算机上执行另外一台计算机上的程序的需求,因此后来又发展出来了RPC技术。特别是目前随着互联网技术的快速迭代和发展,用户和需求几乎都是以指数式的方式在高速增长,这个时候绝大多数情况下程序都是部署在多台机器上,就需要在调用其他物理机器上的程序的情况。

RPC是Remote Procedure Call Protocol单词首字母的缩写,简称为:RPC,翻译成中文叫远程过程调用协议。所谓远程过程调用,通俗的理解就是可以在本地程序中调用运行在另外一台服务器上的程序的功能方法。这种调用的过程跨越了物理服务器的限制,是在网络中完成的,在调用远端服务器上程序的过程中,本地程序等待返回调用结果,直到远端程序执行完毕,将结果进行返回到本地,最终完成一次完整的调用。

需要强调的是:远程过程调用指的是调用远端服务器上的程序的方法整个过程。

rpc的组成

RPC技术在架构设计上有四部分组成,分别是:客户端、客户端存根、服务端、服务端存根。

这里提到了客户端服务端的概念,其属于程序设计架构的一种方式,在现代的计算机软件程序架构设计上,大方向上分为两种方向,分别是:B/S架构C/S架构。B/S架构指的是浏览器到服务器交互的架构方式,另外一种是在计算机上安装一个单独的应用,称之为客户端,与服务器交互的模式。

由于在服务的调用过程中,有一方是发起调用方,另一方是提供服务方。因此,我们把服务发起方称之为客户端,把服务提供方称之为服务端。以下是对RPC的四种角色的解释和说明:

  • **客户端(Client):**服务调用发起方,也称为服务消费者。
  • **客户端存根(Client Stub):**该程序运行在客户端所在的计算机机器上,主要用来存储要调用的服务器的地址,另外,该程序还负责将客户端请求远端服务器程序的数据信息打包成数据包,通过网络发送给服务端Stub程序;其次,还要接收服务端Stub程序发送的调用结果数据包,并解析返回给客户端。
  • **服务端(Server):**远端的计算机机器上运行的程序,其中有客户端要调用的方法。
  • **服务端存根(Server Stub):**接收客户Stub程序通过网络发送的请求消息数据包,并调用服务端中真正的程序功能方法,完成功能调用;其次,将服务端执行调用的结果进行数据处理打包发送给客户端Stub程序。

rpc的原理

了解完了RPC技术的组成结构我们来看一下具体是如何实现客户端到服务端的调用的。实际上,如果我们想要在网络中的任意两台计算机上实现远程调用过程,要解决很多问题,比如:

  • 两台物理机器在网络中要建立稳定可靠的通信连接。
  • 两台服务器的通信协议的定义问题,即两台服务器上的程序如何识别对方的请求和返回结果。也就是说两台计算机必须都能够识别对方发来的信息,并且能够识别出其中的请求含义和返回含义,然后才能进行处理。这其实就是通信协议所要完成的工作。

让我们来看看RPC具体是如何解决这些问题的,RPC具体的调用步骤图如下:

RPC调用步骤图

在上述图中,通过1-10的步骤图解的形式,说明了RPC每一步的调用过程。具体描述为:

  • 1、客户端想要发起一个远程过程调用,首先通过调用本地客户端Stub程序的方式调用想要使用的功能方法名;
  • 2、客户端Stub程序接收到了客户端的功能调用请求,将客户端请求调用的方法名,携带的参数等信息做序列化操作,并打包成数据包。
  • 3、客户端Stub查找到远程服务器程序的IP地址,调用Socket通信协议,通过网络发送给服务端。
  • 4、服务端Stub程序接收到客户端发送的数据包信息,并通过约定好的协议将数据进行反序列化,得到请求的方法名和请求参数等信息。
  • 5、服务端Stub程序准备相关数据,调用本地Server对应的功能方法进行,并传入相应的参数,进行业务处理。
  • 6、服务端程序根据已有业务逻辑执行调用过程,待业务执行结束,将执行结果返回给服务端Stub程序。
  • 7、服务端Stub程序**将程序调用结果按照约定的协议进行序列化,**并通过网络发送回客户端Stub程序。
  • 8、客户端Stub程序接收到服务端Stub发送的返回数据,**对数据进行反序列化操作,**并将调用返回的数据传递给客户端请求发起者。
  • 9、客户端请求发起者得到调用结果,整个RPC调用过程结束。

go实现rpc

在Go语言官方网站的pkg说明中,提供了官方支持的rpc包,具体链接如下:https://golang.org/pkg/net/rpc/。官方提供的rpc包完整的包名是:net/rpc。根据官方的解释,rpc包主要是提供通过网络访问一个对象方法的功能。

本节课,我们就来学习如何使用go语言官方提供的RPC包实现RPC调用编码。

服务定义和暴露

1
func (t *T) MethodName(request T1,response *T2) error

上述代码是go语言官方给出的对外暴露的服务方法的定义标准,其中包含了主要的几条规则,分别是:

  • 1、对外暴露的方法有且只能有两个参数,这个两个参数只能是输出类型或内建类型,两种类型中的一种。
  • 2、方法的第二个参数必须是指针类型。
  • 3、方法的返回类型为error。
  • 4、方法的类型是可输出的。
  • 5、方法本身也是可输出的。

我们举例说明:假设目前我们有一个需求,给出一个float类型变量,作为圆形的半径,要求通过RPC调用,返回对应的圆形面积。具体的编程实现思路如下:

1
2
3
4
5
6
7
type MathUtil struct{
}
//该方法向外暴露:提供计算圆形面积的服务
func (mu *MathUtil) CalculateCircleArea(req float32, resp *float32) error {
    *resp = math.Pi * req * req //圆形的面积 s = π * r * r
    return nil //返回类型
}

注册服务和监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"math"
	"net"
	"net/http"
	"net/rpc"
)
type MathUtil struct {
}

func (m *MathUtil)mathFunc(req float32,resp *float32)error{
	*resp = math.Pi * req * req
	return nil
}
func main() {
	//初始化指针类型
	m := new(MathUtil)
	//注册服务
	err := rpc.Register(m)
	if err != nil{
		panic(err.Error())
	}
	//3、通过该函数把mathUtil中提供的服务注册到HTTP协议上,方便调用者可以利用http的方式进行数据传递
	rpc.HandleHTTP()

	//4、在特定端口监听
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(err.Error())
	}
	go http.Serve(listen,nil)
}

经过服务注册和监听处理,RPC调用过程中的服务端实现就已经完成了。接下来需要实现的是客户端请求代码的实现。

客户端调用

上述的调用方法核心在于client.Call方法的调用,该方法有三个参数,第一个参数表示要调用的远端服务的方法名,第二个参数是调用时要传入的参数,第三个参数是调用要接收的返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
	"fmt"
	"net/rpc"
)

func main() {
	//连接服务器
	client, err := rpc.DialHTTP("tcp", "localhost:8081")
	if err != nil{
		panic(err.Error())
	}
	var req float32 //请求值
	req = 3

	var resp float32 //返回值
	//调用服务
	err = client.Call("MathUtil.MathFunc", req, &resp)
	if err != nil {
		panic(err.Error())
	}
	fmt.Println(resp)
}
客户端异步调用
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func main() {
	//连接服务器
	client, err := rpc.DialHTTP("tcp", "localhost:8081")
	if err != nil{
		panic(err.Error())
	}
	var req float32 //请求值
	req = 3

	var resp float32 //返回值
	//调用服务
	call := client.Go("MathUtil.MathFunc", req, &resp, nil)
	if err != nil {
		panic(err.Error())
	}
	replyDone :=  <-call.Done
	fmt.Println(resp)
	fmt.Println(replyDone)
}
多参数传递
1
2
3
4
type AddParma struct {
    Args1 float32 //第一个参数
    Args2 float32 //第二个参数
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (mu *MathUtil) Add(param param.AddParma, resp *float32) error {
    *resp = param.Args1 + param.Args2 //实现两数相加的功能
    return nil
}
mathUtil := new(MathUtil)

    err := rpc.RegisterName("MathUtil", mathUtil)
    if err != nil {
        panic(err.Error())
    }

    rpc.HandleHTTP()

    listen, err := net.Listen("tcp", ":8082")
    http.Serve(listen, nil)

rpc和protobuffer

首先可以将call函数中的req和resp都用protobuffer生成。其他步骤类似

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
syntax="proto3";

option go_package="./;rpcAndproto";

//订单请求参数
message OrderRequest {
  string orderId = 1;
  int64 timeStamp = 2;
}

//订单信息
message OrderInfo {
  string OrderId = 1;
  string OrderName = 2;
  string OrderStatus = 3;
}

service OrderService{
  rpc GetOrderInfo(OrderRequest) returns (OrderInfo);
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// 	protoc-gen-go v1.28.0
// 	protoc        v3.20.0
// source: Order.proto

package rpcAndproto

import (
	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
	reflect "reflect"
	sync "sync"
)

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

//订单请求参数
type OrderRequest struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	OrderId   string `protobuf:"bytes,1,opt,name=orderId,proto3" json:"orderId,omitempty"`
	TimeStamp int64  `protobuf:"varint,2,opt,name=timeStamp,proto3" json:"timeStamp,omitempty"`
}

func (x *OrderRequest) Reset() {
	*x = OrderRequest{}
	if protoimpl.UnsafeEnabled {
		mi := &file_Order_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *OrderRequest) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*OrderRequest) ProtoMessage() {}

func (x *OrderRequest) ProtoReflect() protoreflect.Message {
	mi := &file_Order_proto_msgTypes[0]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use OrderRequest.ProtoReflect.Descriptor instead.
func (*OrderRequest) Descriptor() ([]byte, []int) {
	return file_Order_proto_rawDescGZIP(), []int{0}
}

func (x *OrderRequest) GetOrderId() string {
	if x != nil {
		return x.OrderId
	}
	return ""
}

func (x *OrderRequest) GetTimeStamp() int64 {
	if x != nil {
		return x.TimeStamp
	}
	return 0
}

//订单信息
type OrderInfo struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	OrderId     string `protobuf:"bytes,1,opt,name=OrderId,proto3" json:"OrderId,omitempty"`
	OrderName   string `protobuf:"bytes,2,opt,name=OrderName,proto3" json:"OrderName,omitempty"`
	OrderStatus string `protobuf:"bytes,3,opt,name=OrderStatus,proto3" json:"OrderStatus,omitempty"`
}

func (x *OrderInfo) Reset() {
	*x = OrderInfo{}
	if protoimpl.UnsafeEnabled {
		mi := &file_Order_proto_msgTypes[1]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *OrderInfo) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*OrderInfo) ProtoMessage() {}

func (x *OrderInfo) ProtoReflect() protoreflect.Message {
	mi := &file_Order_proto_msgTypes[1]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use OrderInfo.ProtoReflect.Descriptor instead.
func (*OrderInfo) Descriptor() ([]byte, []int) {
	return file_Order_proto_rawDescGZIP(), []int{1}
}

func (x *OrderInfo) GetOrderId() string {
	if x != nil {
		return x.OrderId
	}
	return ""
}

func (x *OrderInfo) GetOrderName() string {
	if x != nil {
		return x.OrderName
	}
	return ""
}

func (x *OrderInfo) GetOrderStatus() string {
	if x != nil {
		return x.OrderStatus
	}
	return ""
}

var File_Order_proto protoreflect.FileDescriptor

var file_Order_proto_rawDesc = []byte{
	0x0a, 0x0b, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x46, 0x0a,
	0x0c, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a,
	0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
	0x6f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x53,
	0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65,
	0x53, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x65, 0x0a, 0x09, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x6e,
	0x66, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20,
	0x01, 0x28, 0x09, 0x52, 0x07, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09,
	0x4f, 0x72, 0x64, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
	0x09, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x4f, 0x72,
	0x64, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
	0x0b, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x39, 0x0a, 0x0c,
	0x4f, 0x72, 0x64, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x29, 0x0a, 0x0c,
	0x47, 0x65, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0d, 0x2e, 0x4f,
	0x72, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x2e, 0x4f, 0x72,
	0x64, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x3b, 0x72, 0x70,
	0x63, 0x41, 0x6e, 0x64, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
	0x33,
}

var (
	file_Order_proto_rawDescOnce sync.Once
	file_Order_proto_rawDescData = file_Order_proto_rawDesc
)

func file_Order_proto_rawDescGZIP() []byte {
	file_Order_proto_rawDescOnce.Do(func() {
		file_Order_proto_rawDescData = protoimpl.X.CompressGZIP(file_Order_proto_rawDescData)
	})
	return file_Order_proto_rawDescData
}

var file_Order_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_Order_proto_goTypes = []interface{}{
	(*OrderRequest)(nil), // 0: OrderRequest
	(*OrderInfo)(nil),    // 1: OrderInfo
}
var file_Order_proto_depIdxs = []int32{
	0, // 0: OrderService.GetOrderInfo:input_type -> OrderRequest
	1, // 1: OrderService.GetOrderInfo:output_type -> OrderInfo
	1, // [1:2] is the sub-list for method output_type
	0, // [0:1] is the sub-list for method input_type
	0, // [0:0] is the sub-list for extension type_name
	0, // [0:0] is the sub-list for extension extendee
	0, // [0:0] is the sub-list for field type_name
}

func init() { file_Order_proto_init() }
func file_Order_proto_init() {
	if File_Order_proto != nil {
		return
	}
	if !protoimpl.UnsafeEnabled {
		file_Order_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
			switch v := v.(*OrderRequest); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
		file_Order_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
			switch v := v.(*OrderInfo); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
	}
	type x struct{}
	out := protoimpl.TypeBuilder{
		File: protoimpl.DescBuilder{
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
			RawDescriptor: file_Order_proto_rawDesc,
			NumEnums:      0,
			NumMessages:   2,
			NumExtensions: 0,
			NumServices:   1,
		},
		GoTypes:           file_Order_proto_goTypes,
		DependencyIndexes: file_Order_proto_depIdxs,
		MessageInfos:      file_Order_proto_msgTypes,
	}.Build()
	File_Order_proto = out.File
	file_Order_proto_rawDesc = nil
	file_Order_proto_goTypes = nil
	file_Order_proto_depIdxs = nil
}

服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"errors"
	"goTrip/rpcAndproto"
	"net"
	"net/http"
	"net/rpc"
	"time"
)

type OrderService struct {

}

func (o *OrderService) GetAccount(request rpcAndproto.OrderRequest,response *rpcAndproto.OrderInfo) error{
	orderMap := map[string]rpcAndproto.OrderInfo{
		"201907300001": rpcAndproto.OrderInfo{OrderId: "201907300001", OrderName: "衣服", OrderStatus: "已付款"},
		"201907310001": rpcAndproto.OrderInfo{OrderId: "201907310001", OrderName: "零食", OrderStatus: "已付款"},
		"201907310002": rpcAndproto.OrderInfo{OrderId: "201907310002", OrderName: "食品", OrderStatus: "未付款"},
	}
	current := time.Now().Unix()
	if (request.TimeStamp > current) {
		*response = rpcAndproto.OrderInfo{OrderId: "0", OrderName: "", OrderStatus: "订单信息异常"}
	} else {
		result := orderMap[request.OrderId]//201907310003
		if result.OrderId != "" {
			*response = orderMap[request.OrderId]
		} else {
			return errors.New("server error")
		}
	}
	return nil
}
func main() {
	o := new(OrderService)
	err := rpc.Register(o)
	if err != nil{
		panic(err)
	}
	rpc.HandleHTTP()
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(err)
	}
	http.Serve(listen,nil)
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"fmt"
	"goTrip/rpcAndproto"
	"net/rpc"
	"time"
)

func main() {
	client, err := rpc.DialHTTP("tcp", "localhost:8081")
	if err != nil {
		panic(err.Error())
	}

	timeStamp := time.Now().Unix()
	request := rpcAndproto.OrderRequest{OrderId: "201907310001", TimeStamp: timeStamp}

	var response *rpcAndproto.OrderInfo
	err = client.Call("OrderService.GetAccount", request, &response)
	if err != nil {
		panic(err.Error())
	}

	fmt.Println(*response)
}

初识grpc

得到grpc

1
go get -u google.golang.org/grpc

grpc的使用

我们想要实现的是通过gRPC框架进行远程服务调用,首先第一步应该是要有服务。利用之前所掌握的内容,gRPC框架支持对服务的定义和生成。 gRPC框架默认使用protocol buffers作为接口定义语言,用于描述网络传输消息结构。除此之外,还可以使用protobuf定义服务接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";
package message;

//订单请求参数
message OrderRequest {
    string orderId = 1;
    int64 timeStamp = 2;
}

//订单信息
message OrderInfo {
    string OrderId = 1;
    string OrderName = 2;
    string OrderStatus = 3;
}

//订单服务service定义
service OrderService{
    rpc GetOrderInfo(OrderRequest) returns (OrderInfo);
}

gRPC编译支持

如果定义的.proto文件,如本案例中所示,定义中包含了服务接口的定义,而我们想要使用gRPC框架实现RPC调用。开发者可以采用protocol-gen-go库提供的插件编译功能,生成兼容gRPC框架的golang语言代码。只需要在基本编译命令的基础上,指定插件的参数,告知protoc编译器即可。具体的编译生成兼容gRPC框架的服务代码的命令如下:

1
protoc --go_out=plugins=grpc:. *.proto

pb.go生成的客户端方法代码,给了生成客户端的方法,给了调用服务的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OrderServiceClient is the client API for OrderService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type OrderServiceClient interface {
	GetOrderInfo(ctx context.Context, in *OrderRequest, opts ...grpc.CallOption) (*OrderInfo, error)
}

type orderServiceClient struct {
	cc grpc.ClientConnInterface
}

func NewOrderServiceClient(cc grpc.ClientConnInterface) OrderServiceClient {
	return &orderServiceClient{cc}
}

func (c *orderServiceClient) GetOrderInfo(ctx context.Context, in *OrderRequest, opts ...grpc.CallOption) (*OrderInfo, error) {
	out := new(OrderInfo)
	err := c.cc.Invoke(ctx, "/OrderService/GetOrderInfo", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

pb.go生成的服务器方法代码,提供了注册已实现OrderService接口的方法,定义了一个没有实现方法的类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// OrderServiceServer is the server API for OrderService service.
type OrderServiceServer interface {
	GetOrderInfo(context.Context, *OrderRequest) (*OrderInfo, error)
}

// UnimplementedOrderServiceServer can be embedded to have forward compatible implementations.
type UnimplementedOrderServiceServer struct {
}

func (*UnimplementedOrderServiceServer) GetOrderInfo(context.Context, *OrderRequest) (*OrderInfo, error) {
	return nil, status.Errorf(codes.Unimplemented, "method GetOrderInfo not implemented")
}

func RegisterOrderServiceServer(s *grpc.Server, srv OrderServiceServer) {
	s.RegisterService(&_OrderService_serviceDesc, srv)
}

实现rpc服务

在上面的proto文件中定义的方法,我们可以发现并没有方法体,类似于接口,我们要实现这个方法。

对于实现的方法,参数会对应的改变。

1
func (os *OrderServiceImpl)GetOrderInfo(ctx context.Context,request *grpc.OrderRequest)(*grpc.OrderInfo,error)

image-20220408104515526

grpc实现服务器

这里由于我的包名设置成了grpc(方便复习),所以将google的grpc起别名成了Grpc

1
2
3
4
5
6
7
8
9
func main() {
	server := Grpc.NewServer()
	grpc.RegisterOrderServiceServer(server,new(OrderServiceImpl))
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(err)
	}
	server.Serve(listen)
}

grpc实现客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
	"fmt"
	"golang.org/x/net/context"
	Grpc "google.golang.org/grpc"
	"goTrip/grpc"
	"time"
)

func main() {
	conn, err := Grpc.Dial("localhost:8081", Grpc.WithInsecure())
	if err != nil{
		panic(err)
	}
	defer conn.Close()
    //从proto中获得连接服务的客户端
	client := grpc.NewOrderServiceClient(conn)
	req := &grpc.OrderRequest{
		OrderId:   "201907300001",
		TimeStamp: time.Now().Unix(),
	}
	info, err := client.GetOrderInfo(context.Background(), req)
	if info != nil{
		fmt.Println(info.GetOrderId())
		fmt.Println(info.GetOrderName())
		fmt.Println(info.GetOrderStatus())
	}
}

grpc的流模式

在上节课内容中,我们学习了使用gRPC框架实现服务的调用编程。在gRPC框架中,诸如上节课我们学习的在客户端与服务端之间通过消息结构体定义的方式来传递数据,我们称之为“单项RPC”,也称之为简单模式。除此之外,gRPC中还有数据流模式的RPC调用实现,这正是我们本节课要学习的内容。

服务器流形式

定义proto文件

这一次的proto文件和上一个Cs模式的有些许不同,return的带有stream

1
2
3
service OrderService {
    rpc GetOrderInfos (OrderRequest) returns (stream OrderInfo) {}; //服务端流模式
}

通过stream修饰的方式表示该接口调用时,服务端会以数据流的形式将数据返回给客户端。

1
protoc --go_out=plugins=grpc:. ./*.proto

生成的文件也会有些许不同。

对于方法方面,客户端如下,发现相较于上面的非流传输,多了一个recv函数,这个应该时用来接受服务器传过来的OrderInfo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewOrderServiceClient(cc grpc.ClientConnInterface) OrderServiceClient {
	return &orderServiceClient{cc}
}

func (c *orderServiceClient) GetOrderInfo(ctx context.Context, in *OrderRequest, opts ...grpc.CallOption) (OrderService_GetOrderInfoClient, error) {
	stream, err := c.cc.NewStream(ctx, &_OrderService_serviceDesc.Streams[0], "/OrderService/GetOrderInfo", opts...)
	if err != nil {
		return nil, err
	}
	x := &orderServiceGetOrderInfoClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

type OrderService_GetOrderInfoClient interface {
	Recv() (*OrderInfo, error)
	grpc.ClientStream
}

type orderServiceGetOrderInfoClient struct {
	grpc.ClientStream
}

func (x *orderServiceGetOrderInfoClient) Recv() (*OrderInfo, error) {
	m := new(OrderInfo)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

服务器这边也是,多了一个send方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// OrderServiceServer is the server API for OrderService service.
type OrderServiceServer interface {
	GetOrderInfo(*OrderRequest, OrderService_GetOrderInfoServer) error
}

// UnimplementedOrderServiceServer can be embedded to have forward compatible implementations.
type UnimplementedOrderServiceServer struct {
}

func (*UnimplementedOrderServiceServer) GetOrderInfo(*OrderRequest, OrderService_GetOrderInfoServer) error {
	return status.Errorf(codes.Unimplemented, "method GetOrderInfo not implemented")
}

func RegisterOrderServiceServer(s *grpc.Server, srv OrderServiceServer) {
	s.RegisterService(&_OrderService_serviceDesc, srv)
}

func _OrderService_GetOrderInfo_Handler(srv interface{}, stream grpc.ServerStream) error {
	m := new(OrderRequest)
	if err := stream.RecvMsg(m); err != nil {
		return err
	}
	return srv.(OrderServiceServer).GetOrderInfo(m, &orderServiceGetOrderInfoServer{stream})
}

type OrderService_GetOrderInfoServer interface {
	Send(*OrderInfo) error
	grpc.ServerStream
}

type orderServiceGetOrderInfoServer struct {
	grpc.ServerStream
}

func (x *orderServiceGetOrderInfoServer) Send(m *OrderInfo) error {
	return x.ServerStream.SendMsg(m)
}
服务器

由于服务器用的流形式回传,所以参数中带有OrderService_GetOrderInfoServer这个含有send方法的接口,用于传输流。

1
2
3
4
type OrderService_GetOrderInfoServer interface {
	Send(*OrderInfo) error
	grpc.ServerStream
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (os *OrderServiceImpl) GetOrderInfos(request *streamRpc.OrderRequest, stream streamRpc.OrderService_GetOrderInfoServer) error {
	fmt.Println(" 服务端流 RPC 模式")

	orderMap := map[string]streamRpc.OrderInfo{
		"201907300001": streamRpc.OrderInfo{OrderId: "201907300001", OrderName: "衣服", OrderStatus: "已付款"},
		"201907310001": streamRpc.OrderInfo{OrderId: "201907310001", OrderName: "零食", OrderStatus: "已付款"},
		"201907310002": streamRpc.OrderInfo{OrderId: "201907310002", OrderName: "食品", OrderStatus: "未付款"},
	}
	for id, info := range orderMap {
		if (time.Now().Unix() >= request.TimeStamp) {
			fmt.Println("订单序列号ID:", id)
			fmt.Println("订单详情:", info)
			//通过流模式发送给客户端
			stream.Send(&info)
		}
	}
	return nil
}

服务的监听与处理与前文所学内容没有区别,依然是相同的步骤:

1
2
3
4
5
6
7
8
9
func main() {
	server := grpc.NewServer()
	streamRpc.RegisterOrderServiceServer(server,new(OrderServiceImpl))
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(nil)
	}
	server.Serve(listen)
}
客户端

服务端使用Send方法将数据以流的形式进行发送,客户端可以使用Recv()方法接收流数据,因为数据流失源源不断的,因此使用for无限循环实现数据流的读取,当读取到io.EOF时,表示流数据结束。客户端数据读取实现如下

先还是拨号到指定端口,然后就可以创建一个client调用服务,服务会返回一个OrderService_GetOrderInfoClient,该接口实现了Recv,我们直接调用Recv方法接受信息即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
	"context"
	"fmt"
	"goTrip/streamRpc"
	"google.golang.org/grpc"
	"io"
	"time"
)

func main() {
	conn, err := grpc.Dial("localhost:8081", grpc.WithInsecure())
	if err != nil{
		panic(err)
	}
	defer conn.Close()
	client := streamRpc.NewOrderServiceClient(conn)
	req := &streamRpc.OrderRequest{
		OrderId:   "201907300001",
		TimeStamp: time.Now().Unix(),
	}
	stream, err := client.GetOrderInfo(context.Background(), req)
	for{
		recv, err := stream.Recv()
		if err == io.EOF{
			break
		}
		if err != nil{
			panic(err)
		}
		fmt.Println("读取的消息:",recv)
	}
}

客户端流模式

定义proto文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";
option go_package="./;streamRpc";

//订单请求参数
message OrderRequest {
  string orderId = 1;
  int64 timeStamp = 2;
}

//订单信息
message OrderInfo {
  string OrderId = 1;
  string OrderName = 2;
  string OrderStatus = 3;
}

service OrderService {
  rpc AddOrderList (stream OrderRequest) returns (OrderInfo) {}; //客户端流模式
}

生成的客户端文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type OrderServiceClient interface {
	GetOrderInfo(ctx context.Context, opts ...grpc.CallOption) (OrderService_GetOrderInfoClient, error)
}

type orderServiceClient struct {
	cc grpc.ClientConnInterface
}

func NewOrderServiceClient(cc grpc.ClientConnInterface) OrderServiceClient {
	return &orderServiceClient{cc}
}

func (c *orderServiceClient) GetOrderInfo(ctx context.Context, opts ...grpc.CallOption) (OrderService_GetOrderInfoClient, error) {
	stream, err := c.cc.NewStream(ctx, &_OrderService_serviceDesc.Streams[0], "/OrderService/GetOrderInfo", opts...)
	if err != nil {
		return nil, err
	}
	x := &orderServiceGetOrderInfoClient{stream}
	return x, nil
}

type OrderService_GetOrderInfoClient interface {
	Send(*OrderRequest) error
	CloseAndRecv() (*OrderInfo, error)
	grpc.ClientStream
}

type orderServiceGetOrderInfoClient struct {
	grpc.ClientStream
}

func (x *orderServiceGetOrderInfoClient) Send(m *OrderRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *orderServiceGetOrderInfoClient) CloseAndRecv() (*OrderInfo, error) {
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	m := new(OrderInfo)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

SendAndClose和Recv方法是客户端流模式下的服务端对象所拥有的方法。

服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type OrderServiceServer interface {
	GetOrderInfo(OrderService_GetOrderInfoServer) error
}

// UnimplementedOrderServiceServer can be embedded to have forward compatible implementations.
type UnimplementedOrderServiceServer struct {
}

func (*UnimplementedOrderServiceServer) GetOrderInfo(OrderService_GetOrderInfoServer) error {
	return status.Errorf(codes.Unimplemented, "method GetOrderInfo not implemented")
}

func RegisterOrderServiceServer(s *grpc.Server, srv OrderServiceServer) {
	s.RegisterService(&_OrderService_serviceDesc, srv)
}

func _OrderService_GetOrderInfo_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(OrderServiceServer).GetOrderInfo(&orderServiceGetOrderInfoServer{stream})
}

type OrderService_GetOrderInfoServer interface {
	SendAndClose(*OrderInfo) error
	Recv() (*OrderRequest, error)
	grpc.ServerStream
}

type orderServiceGetOrderInfoServer struct {
	grpc.ServerStream
}

func (x *orderServiceGetOrderInfoServer) SendAndClose(m *OrderInfo) error {
	return x.ServerStream.SendMsg(m)
}

func (x *orderServiceGetOrderInfoServer) Recv() (*OrderRequest, error) {
	m := new(OrderRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

Send和CloseAndRecv是客户端流模式下的客户端对象所拥有的方法。

服务器实现

由于信息全在流中,所以不需要req了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (os *OrderServiceImpl) AddOrderList(stream streamRpc.OrderService_AddOrderListServer) error {
	fmt.Println(" 客户端 RPC 模式")

	for  {
		recv, err := stream.Recv()
		if err == io.EOF {
			fmt.Println(" 读取数据结束 ")
			result := streamRpc.OrderInfo{OrderStatus: " 读取数据结束 "}
			return stream.SendAndClose(&result)
		}
		if err != nil {
			fmt.Println(err.Error())
			return err
		}
		fmt.Println(recv)
	}

	return nil
}

监听服务器依然差不多

1
2
3
4
5
6
7
8
9
func main() {
	server := grpc.NewServer()
	streamRpc.RegisterOrderServiceServer(server,new(OrderServiceImpl))
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(nil)
	}
	server.Serve(listen)
}
客户端实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
	"context"
	"fmt"
	"goTrip/streamRpc"
	"google.golang.org/grpc"
	"io"
	"time"
)

func main() {
	orderMap := map[string]streamRpc.OrderRequest{
		"201907300001": streamRpc.OrderRequest{OrderId: "201907300001",TimeStamp: time.Now().Unix()},
		"201907310001": streamRpc.OrderRequest{OrderId: "201907310001", TimeStamp: time.Now().Unix()},
		"201907310002": streamRpc.OrderRequest{OrderId: "201907310002", TimeStamp: time.Now().Unix()},
	}
	conn, err := grpc.Dial("localhost:8081", grpc.WithInsecure())
	if err != nil{
		panic(err)
	}
	defer conn.Close()
	client := streamRpc.NewOrderServiceClient(conn)
	stream, err := client.AddOrderList(context.Background())
	//调用方法发送流数据
	for _, info := range orderMap {
		err = stream.Send(&info)
		if err != nil {
			panic(err.Error())
		}
	}
	for {
		orderInfo, err := stream.CloseAndRecv()
		if err == io.EOF {
			fmt.Println(" 读取数据结束了 ")
			return
		}
		if err != nil {
			fmt.Println(err.Error())
		}
		fmt.Println(orderInfo.GetOrderStatus())
	}
}

这个交互类似于,服务器会通过recv接收然后进行业务处理,然后通过sendAndClose进行回传结果,client通过send发送流信息,然后通过CloseAndRecive接受消息。

双向流模式

定义proto文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";
option go_package="./;streamRpc";

//订单请求参数
message OrderRequest {
  string orderId = 1;
  int64 timeStamp = 2;
}

//订单信息
message OrderInfo {
  string OrderId = 1;
  string OrderName = 2;
  string OrderStatus = 3;
}

//订单服务service定义
service OrderService {
  rpc GetOrderInfos (stream OrderRequest) returns (stream OrderInfo) {}; //双向流模式
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type OrderService_GetOrderInfosClient interface {
	Send(*OrderRequest) error
	Recv() (*OrderInfo, error)
	grpc.ClientStream
}

type orderServiceGetOrderInfosClient struct {
	grpc.ClientStream
}

func (x *orderServiceGetOrderInfosClient) Send(m *OrderRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *orderServiceGetOrderInfosClient) Recv() (*OrderInfo, error) {
	m := new(OrderInfo)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type OrderService_GetOrderInfosServer interface {
	Send(*OrderInfo) error
	Recv() (*OrderRequest, error)
	grpc.ServerStream
}

type orderServiceGetOrderInfosServer struct {
	grpc.ServerStream
}

func (x *orderServiceGetOrderInfosServer) Send(m *OrderInfo) error {
	return x.ServerStream.SendMsg(m)
}

func (x *orderServiceGetOrderInfosServer) Recv() (*OrderRequest, error) {
	m := new(OrderRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

可以发现无论是服务器还是客户端都增加了recv和send方法。

服务器实现

参数是实现了recv和send的接口,可以发现recv和send都放到了同一个for中,如果读取结束自动break

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
	"fmt"
	"goTrip/streamRpc"
	"google.golang.org/grpc"
	"io"
	"net"
)

type OrderServiceImpl struct {
}

//实现grpc双向流模式
func (os *OrderServiceImpl) GetOrderInfos(stream streamRpc.OrderService_GetOrderInfosServer) error {

	for {
		orderRequest, err := stream.Recv()
		if err == io.EOF {
			fmt.Println(" 数据读取结束 ")
			return err
		}
		if err != nil {
			panic(err.Error())
			return err
		}

		fmt.Println(orderRequest.GetOrderId())
		orderMap := map[string]streamRpc.OrderInfo{
			"201907300001": streamRpc.OrderInfo{OrderId: "201907300001", OrderName: "衣服", OrderStatus: "已付款"},
			"201907310001": streamRpc.OrderInfo{OrderId: "201907310001", OrderName: "零食", OrderStatus: "已付款"},
			"201907310002": streamRpc.OrderInfo{OrderId: "201907310002", OrderName: "食品", OrderStatus: "未付款"},
		}

		result := orderMap[orderRequest.GetOrderId()]
		//发送数据
		err = stream.Send(&result)
		if err == io.EOF {
			fmt.Println(err)
			return err
		}
		if err != nil {
			fmt.Println(err.Error())
			return err
		}
	}
	return nil
}

func main() {
	server := grpc.NewServer()
	streamRpc.RegisterOrderServiceServer(server,new(OrderServiceImpl))
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(nil)
	}
	server.Serve(listen)
}
客户端实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"context"
	"fmt"
	"goTrip/streamRpc"
	"google.golang.org/grpc"
	"io"
)

func main() {
	//1、Dail连接
	conn, err := grpc.Dial("localhost:8081", grpc.WithInsecure())
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	orderServiceClient := streamRpc.NewOrderServiceClient(conn)

	fmt.Println("客户端请求RPC调用:双向流模式")
	orderIDs := []string{"201907300001", "201907310001", "201907310002"}

	orderInfoClient, err := orderServiceClient.GetOrderInfos(context.Background())
	for _, orderID := range orderIDs {
		orderRequest := streamRpc.OrderRequest{OrderId: orderID}
		err := orderInfoClient.Send(&orderRequest)
		if err != nil {
			panic(err.Error())
		}
	}

	//关闭
	orderInfoClient.CloseSend()

	for {
		orderInfo, err := orderInfoClient.Recv()
		if err == io.EOF {
			fmt.Println("读取结束")
			return
		}
		if err != nil {
			return
		}
		fmt.Println("读取到的信息:", orderInfo)
	}
}

grpc验证

gRPC中默认支持两种授权方式,分别是:SSL/TLS认证方式、基于Token的认证方式。

SSL全称是Secure Sockets Layer,又被称之为安全套接字层,是一种标准安全协议,用于在通信过程中建立客户端与服务器之间的加密链接。 TLS的全称是Transport Layer Security,TLS是SSL的升级版。在使用的过程中,往往习惯于将SSL和TLS组合在一起写作SSL/TLS。 简而言之,SSL/TLS是一种用于网络通信中加密的安全协议。

基于token

在gRPC中,允许开发者自定义自己的认证规则,通过

1
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) 

可以看到我们需要实现一个PerRPCCredentials接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type PerRPCCredentials interface {
	// GetRequestMetadata gets the current request metadata, refreshing
	// tokens if required. This should be called by the transport layer on
	// each request, and the data should be populated in headers or other
	// context. If a status code is returned, it will be used as the status
	// for the RPC. uri is the URI of the entry point for the request.
	// When supported by the underlying implementation, ctx can be used for
	// timeout and cancellation. Additionally, RequestInfo data will be
	// available via ctx to this call.
	// TODO(zhaoq): Define the set of the qualified keys instead of leaving
	// it as an arbitrary string.
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	// RequireTransportSecurity indicates whether the credentials requires
	// transport security.
	RequireTransportSecurity() bool
}

我们自己创建一个结构体实现一下吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package token

import "context"

type TokenAuthentication struct {
	ApiKey string
	ApiSecret string
}
//组织token信息
func (t *TokenAuthentication) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)  {
	return map[string]string{
		"apikey": t.ApiKey,
		"apisecret": t.ApiSecret,
	},nil
}
//是否基于TLS认证进行安全传输
func (t *TokenAuthentication) RequireTransportSecurity()  bool{
	return false
}

以后客户端可以通过token来访问

1
2
3
4
5
6
7
8
auth := TokenAuthentication{
		ApiKey:    "hello",
		ApiSecret: "20190812",
	}
	dial, err := grpc.Dial("localhost:8081", grpc.WithPerRPCCredentials(&auth))
	if err != nil {
		panic(err.Error())
	}

服务器

这里的服务器通过metadata.FromIncomingContext(ctx),获得key,secret,然后进行校验。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
	"fmt"
	"goTrip/rpcToken"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"net"
)



type MathManager struct {
}

func (mm *MathManager) AddMethod(ctx context.Context, request *rpcToken.RequestArgs) (response *rpcToken.Response, err error) {
	meta, exist := metadata.FromIncomingContext(ctx)
	if !exist{
		return nil,status.Errorf(codes.Unauthenticated,"无token")
	}
	var appKey string
	var appSecret string
	key,ok := meta["apikey"]
	if ok{
		appKey = key[0]
	}
	secret,ok := meta["apisecret"]
	if ok{
		appSecret = secret[0]
	}
	fmt.Println(appKey,appSecret)
	//控制token
	if appKey != "hello" || appSecret != "20190812" {
		return nil, status.Errorf(codes.Unauthenticated, "Token 不合法")
	}
	fmt.Println(" 服务端 Add方法 ")
	result := request.Args1 + request.Args2
	fmt.Println(" 计算结果是:", result)
	response = new(rpcToken.Response)
	response.Code = 1;
	response.Message = "执行成功"
	return response, nil
}

func main() {
	server := grpc.NewServer()
	rpcToken.RegisterMathServiceServer(server,new(MathManager))
	listen, err := net.Listen("tcp", ":8081")
	if err != nil{
		panic(err)
	}
	server.Serve(listen)
}

客户端

要注意 grpc.Dial(“localhost:8081”, grpc.WithPerRPCCredentials(tk),grpc.WithInsecure()),如果只用token需要用grpc.WithInsecure()

否则会报错。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
   "context"
   "fmt"
   "goTrip/rpcToken"
   "goTrip/rpcToken/token"
   "google.golang.org/grpc"
   "log"
)

func main() {
   tk := &token.TokenAuthentication{
      ApiKey:    "hello",
      ApiSecret: "20190812",
   }
   dial, err := grpc.Dial("localhost:8081", grpc.WithPerRPCCredentials(tk),grpc.WithInsecure())
   if err != nil{
      fmt.Println(err.Error())
      log.Fatal(err.Error())
   }
   client := rpcToken.NewMathServiceClient(dial)
   req := &rpcToken.RequestArgs{
      Args1: 1,
      Args2: 2,
   }
   resp, err := client.AddMethod(context.Background(), req)
   if err != nil{
      fmt.Println(err.Error())
      log.Fatal(err.Error())
   }
   fmt.Println(resp.GetCode(),resp.GetMessage())
}

grpc拦截器

在上节课程中,我们学习使用了gRPC框架中的两种认证方式:TLS验证和Token验证。

但是,在服务端的方法中,每个方法都要进行token的判断。程序效率太低,可以优化一下处理逻辑,在调用服务端的具体方法之前,先进行拦截,并进行token验证判断,这种方式称之为拦截器处理。

除了此处的token验证判断处理以外,还可以进行日志处理等。

使用拦截器,首先需要注册。在grpc中编程实现中,可以在NewSever时添加拦截器设置,grpc框架中可以通过UnaryInterceptor方法设置自定义的拦截器,并返回ServerOption。具体代码如下:

1
grpc.UnaryInterceptor()

点进去发现需要一个函数

1
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

我们按自己的逻辑实现就好了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func TokenInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error){
	//通过metadata
	md, exist := metadata.FromIncomingContext(ctx)
	if !exist {
		return nil, status.Errorf(codes.Unauthenticated, "无Token认证信息")
	}

	var appKey string
	var appSecret string
	if key, ok := md["apikey"]; ok {
		appKey = key[0]
	}
	if secret, ok := md["apisecret"]; ok {
		appSecret = secret[0]
	}

	if appKey != "hello" || appSecret != "20190812" {
		return nil, status.Errorf(codes.Unauthenticated, "Token 不合法")
	}
    //有点像java的doFilter,向下执行的意思
	return handler(ctx,req)
}

当然光写了拦截方法不行,我们得注册到服务器

1
	server := grpc.NewServer(grpc.UnaryInterceptor(TokenInterceptor))

整体代码:

服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main

import (
   "fmt"
   "goTrip/rpcToken"
   "golang.org/x/net/context"
   "google.golang.org/grpc"
   "google.golang.org/grpc/codes"
   "google.golang.org/grpc/metadata"
   "google.golang.org/grpc/status"
   "net"
)



type MathManager struct {
}

func TokenInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error){
   //通过metadata
   md, exist := metadata.FromIncomingContext(ctx)
   if !exist {
      return nil, status.Errorf(codes.Unauthenticated, "无Token认证信息")
   }

   var appKey string
   var appSecret string
   if key, ok := md["apikey"]; ok {
      appKey = key[0]
   }
   if secret, ok := md["apisecret"]; ok {
      appSecret = secret[0]
   }

   if appKey != "hello" || appSecret != "20190812" {
      return nil, status.Errorf(codes.Unauthenticated, "Token 不合法")
   }
   return handler(ctx,req)
}
func (mm *MathManager) AddMethod(ctx context.Context, request *rpcToken.RequestArgs) (response *rpcToken.Response, err error) {
   //meta, exist := metadata.FromIncomingContext(ctx)
   //if !exist{
   // return nil,status.Errorf(codes.Unauthenticated,"无token")
   //}
   //var appKey string
   //var appSecret string
   //key,ok := meta["apikey"]
   //if ok{
   // appKey = key[0]
   //}
   //secret,ok := meta["apisecret"]
   //if ok{
   // appSecret = secret[0]
   //}
   //fmt.Println(appKey,appSecret)
   ////控制token
   //if appKey != "hello" || appSecret != "20190812" {
   // return nil, status.Errorf(codes.Unauthenticated, "Token 不合法")
   //}
   fmt.Println(" 服务端 Add方法 ")
   result := request.Args1 + request.Args2
   fmt.Println(" 计算结果是:", result)
   response = new(rpcToken.Response)
   response.Code = 1;
   response.Message = "执行成功"
   return response, nil
}

func main() {
   server := grpc.NewServer(grpc.UnaryInterceptor(TokenInterceptor))
   rpcToken.RegisterMathServiceServer(server,new(MathManager))
   listen, err := net.Listen("tcp", ":8081")
   if err != nil{
      panic(err)
   }
   server.Serve(listen)
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"context"
	"fmt"
	"goTrip/rpcToken"
	"goTrip/rpcToken/token"
	"google.golang.org/grpc"
	"log"
)

func main() {
	tk := &token.TokenAuthentication{
		ApiKey:    "hello",
		ApiSecret: "20190812",
	}
	dial, err := grpc.Dial("localhost:8081", grpc.WithPerRPCCredentials(tk),grpc.WithInsecure())
	if err != nil{
		fmt.Println(err.Error())
		log.Fatal(err.Error())
	}
	client := rpcToken.NewMathServiceClient(dial)
	req := &rpcToken.RequestArgs{
		Args1: 1,
		Args2: 2,
	}
	resp, err := client.AddMethod(context.Background(), req)
	if err != nil{
		fmt.Println(err.Error())
		log.Fatal(err.Error())
	}
	fmt.Println(resp.GetCode(),resp.GetMessage())
}

comments powered by Disqus
welcome to my blog,here we can study together
Built with Hugo
主题 StackJimmy 设计