thrift客户端
(1) 设计
TClient ,
TStandardClient、WrappedTClient 实现了TClient
接口的Call
方法
(2) demo
https://github.com/weikeqin/thrift-tutorial-go-demo
以client调用add方法为例
调用下游Add()方法代码
func main() {
// 创建thrift client
thriftClient := getThriftClient()
// 创建 calculatorClientProxy 其实是一个代理类(静态代理) 代理了idl定义的所有方法
// tutorial是由idl生成的
calculatorClientProxy := tutorial.NewCalculatorClient(thriftClient)
// 调用Add方法
sum, _ := calculatorClientProxy.Add(defaultCtx, 1, 2)
fmt.Print("1+2=", sum, "\n")
}
(2.1) Add()方法定义
//
(2.2) idl里代理方法
CalculatorClient::Add 实现了 idl里定义的 Add方法, CalculatorClient 可以看做Calculator接口的代理
详细的根据gen命令生成的go代码如下
// gen-go/tutorial/tutorial.go
package tutorial
// Parameters:
// - Num1
// - Num2
func (p *CalculatorClient) Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error) {
// 业务方法入参包装 `CalculatorAddArgs`实现了`TStruct`接口
var _args3 CalculatorAddArgs
_args3.Num1 = num1
_args3.Num2 = num2
// 业务返回结果
// call的第4个入参类型是`TStruct`,`CalculatorAddResult`实现了`TStruct`接口,可以直接传递
var _result5 CalculatorAddResult
// 通用返回结果里的headers
var _meta4 thrift.ResponseMeta
// 调用 TStandardClient::Call 方法
_meta4, _err = p.Client_().Call(ctx, "add", &_args3, &_result5)
p.SetLastResponseMeta_(_meta4)
if _err != nil {
return
}
// 返回业务处理结果
return _result5.GetSuccess(), nil
}
(3) 源码解读-go
TClient
(3.1) TClient
TClient定义了客户端
package thrift
// thrift.client.go
type TClient interface {
Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)
}
(3.2) TStandardClient
TStandardClient有3个方法 Send
Recv
Call
package thrift
// thrift.client.go
//
type TStandardClient struct {
seqId int32 // 序列Id
iprot, oprot TProtocol // 输入传输协议 输出传输协议
}
(3.2.1) 创建NewTStandardClient方法 NewTStandardClient
// TStandardClient 实现了 TClient 接口,使用 Thrift 的标准消息格式。
// 并发使用是不安全的
//
// TStandardClient implements TClient, and uses the standard message format for Thrift.
// It is not safe for concurrent use.
func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {
return &TStandardClient{
iprot: inputProtocol,
oprot: outputProtocol,
}
}
(3.2.2) Call
// param method 方法名
// param args 业务入参
// param result 返回(业务)结果
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {
// 序列Id+1 注意:这儿不是并发安全的
p.seqId++
seqId := p.seqId
// 调用Send方法
if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {
return ResponseMeta{}, err
}
// method is oneway
if result == nil {
return ResponseMeta{}, nil
}
// 调用Recv方法,把返回结果写入result
err := p.Recv(ctx, p.iprot, seqId, method, result)
// 处理返回headers
var headers THeaderMap
if hp, ok := p.iprot.(*THeaderProtocol); ok {
headers = hp.transport.readHeaders
}
return ResponseMeta{
Headers: headers,
}, err
}
从以上的代码可以看到
1、TStandardClient
并发使用是不安全的,所以使用的时候得每次新建一个
2、TStandardClient
是同步调用
(3.2.3) Send
// client给Server发请求的方法
func (p *TStandardClient) Send(ctx context.Context, oprot TProtocol, seqId int32, method string, args TStruct) error {
// 设置headers
// Set headers from context object on THeaderProtocol
if headerProt, ok := oprot.(*THeaderProtocol); ok {
headerProt.ClearWriteHeaders()
for _, key := range GetWriteHeaderList(ctx) {
if value, ok := GetHeader(ctx, key); ok {
headerProt.SetWriteHeader(key, value)
}
}
}
// 使用配置的传输协议写入数据
// method是方法名 CALL是TMessageType对应的枚举 seqId是序列Id
// 调用server接口 其实就是给server发一个请求,按照约定的传输协议(TProtocol)和传输方式(TTransport)传输对应的二进制数据
// 这个例子里传输协议是 TBinaryProtocol 传输方式是 TSocket
if err := oprot.WriteMessageBegin(ctx, method, CALL, seqId); err != nil {
return err
}
if err := args.Write(ctx, oprot); err != nil {
return err
}
if err := oprot.WriteMessageEnd(ctx); err != nil {
return err
}
// 传输协议 调 传输方式 flush
return oprot.Flush(ctx)
}
可以看到,client 的send方法主要作用有2个
1、(在传输协议是THeaderProtocol的情况下)设置headers
2、按照约定的传输一些写数据,并flush
(3.2.4) Recv
// 接收请求方法
func (p *TStandardClient) Recv(ctx context.Context, iprot TProtocol, seqId int32, method string, result TStruct) error {
//
rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin(ctx)
if err != nil {
return err
}
// 消息校验 校验方法名、序列Id、消息类型
if method != rMethod { // 方法和消息里的方法不一致,抛异常
return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
} else if seqId != rSeqId { // 序列Id不一致,抛异常
return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
} else if rTypeId == EXCEPTION { // 异常类消息,抛异常
var exception tApplicationException
if err := exception.Read(ctx, iprot); err != nil {
return err
}
if err := iprot.ReadMessageEnd(ctx); err != nil {
return err
}
return &exception
} else if rTypeId != REPLY { // 返回结果里消息类型不是回复消息,抛异常
return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
}
// 读取消息
if err := result.Read(ctx, iprot); err != nil {
return err
}
//
return iprot.ReadMessageEnd(ctx)
}
(3.3) WrappedTClient
Wrapper提供了一种包装机制,使得在执行某方法前先执行Wrapper
因此可以在客户端和服务器做很多功能:接口监控、熔断限流、鉴权、Filter等。
// file thrift/middleware.go
package thrift
// WrappedTClient is a convenience struct that implements the TClient interface
// using inner Wrapped function.
//
// This is provided to aid in developing ClientMiddleware.
type WrappedTClient struct {
Wrapped func(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)
}
(3.3.1) 新建WrappedTClient
// file thrift/middleware.go
package thrift
// 验证 WrappedTClient 实现了 TClient
// verify that WrappedTClient implements TClient
var (
_ TClient = WrappedTClient{}
_ TClient = (*WrappedTClient)(nil)
)
// WrappedTClient 是一个使用内部 Wrapped 函数实现 TClient 接口的便利结构。
// 这是为了帮助开发 ClientMiddleware。
//
// WrapClient wraps the given TClient in the given middlewares.
//
// Middlewares will be called in the order that they are defined:
//
// 1. Middlewares[0]
// 2. Middlewares[1]
// ...
// N. Middlewares[n]
func WrapClient(client TClient, middlewares ...ClientMiddleware) TClient {
// 反向添加中间件,因此列表中的第一个是最外面的。
// Add middlewares in reverse so the first in the list is the outermost.
for i := len(middlewares) - 1; i >= 0; i-- {
// middlewares[i]是ClientMiddleware,是一个函数,client是对应的入参
client = middlewares[i](client)
}
return client
}
(3.3.2) Call
// file thrift/middleware.go
package thrift
// Call 通过调用 c.Wrapped的TClient接口实现 来方法调用 并返回结果 。
// Call implements the TClient interface by calling and returning c.Wrapped.
func (c WrappedTClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {
//
return c.Wrapped(ctx, method, args, result)
}
(3.4) WrappedTClient Demo
func main() {
var thriftClient thrift.TClient
thriftClient = getThriftClient()
// 扩展点:日志中间件
clientMiddleware := clientLoggingMiddleware()
// 包装后的client
wrapClient := thrift.WrapClient(thriftClient, clientMiddleware)
// 创建 calculatorClient tutorial是由idl生成的
calculatorClient := tutorial.NewCalculatorClient(wrapClient)
// 调用Add方法
sum, _ := calculatorClient.Add(defaultCtx, 1, 2)
fmt.Print("1+2=", sum, "\n")
}
// 生成一个日志中间件 Demo用
func clientLoggingMiddleware() thrift.ClientMiddleware {
clientMiddleware := func(next thrift.TClient) thrift.TClient {
wrappedTClient := thrift.WrappedTClient{
Wrapped: func(ctx context.Context, method string, args, result thrift.TStruct) (thrift.ResponseMeta, error) {
// 调用前打印 方法名及参数
log.Printf("Before: %q Args: %#v ", method, args)
// 方法调用
headers, err := next.Call(ctx, method, args, result)
// 调用后打印
log.Printf("After: %q Result: %#v", method, result)
if err != nil {
log.Printf("Error: %v", err)
}
return headers, err
},
}
return wrappedTClient
}
return clientMiddleware
}
问题
(1) clinet write tcp broken pipe
17:26:38.879 errmsg=write tcp 127.0.0.1:57899->127.0.0.1:8081: write: broken pipe
17:26:38.884 Error while flushing write buffer of size 287 to transport, only wrote 0 bytes: write tcp 127.0.0.1:59781->127.0.0.1:8081: write: broken pipe
17:26:43.220 errmsg=write tcp 127.0.0.1:59781->127.0.0.1:8081: write: broken pipe
client建立连接后,server读数据超时断开连接,client再写数据提示 broken pipe
server write tcp broken pipe
errmsg=write tcp 10.157.121.37:8300->10.159.184.191:57066: write: broken pipe||timeout=0s
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:58842: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50344: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:38252: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22356: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50002: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:32572: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59450: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59320: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:41668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:36668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:60856: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:49126: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:30506: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:55618: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22438: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:43892: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:26776: write: broken pipe
client调用超时,server处理请求后写入数据发送,但是这个时候client已经关闭连接,所以报错 write: broken pipe