typeOrderServiceServerinterface{CreateOrder(OrderService_CreateOrderServer)error}typeOrderService_CreateOrderServerinterface{SendAndClose(*Empty)error// 实际上并不会执行任何 close 操作,由客户端在 recv 时 close
Recv()(*Empty,error)grpc.ServerStream}
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
typeOrderServiceClientinterface{BothWayStream(ctxcontext.Context,opts...grpc.CallOption)(OrderService_BothWayStreamClient,error)}typeOrderService_BothWayStreamClientinterface{Send(*Empty)errorRecv()(*Empty,error)grpc.ClientStream}
funcclient(stdinio.Reader,stdoutio.Writer)error{// 定义一个双向数据流
stream,err:=orderpb.NewOrderServiceClient(grpcConn).BothWayStream(ctx)iferr!=nil{return}// read from stdin
gofunc(){buf:=make([]byte,1024)for{n,err:=stdin.Read(buf)iferr!=nil{return}err=stream.Send(&orderpb.Empty{buf})iferr!=nil{return}}}()// write to stdout
for{empty,err:=stream.Recv()iferr!=nil{returnerr}_,err=stdout.Write(empty.Data)iferr!=nil{returnerr}}}
服务端的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
funcserver(streamorderpb.OrderService_BothWayStreamServer)error{for{req,err:=stream.Recv()iferr!=nil{returnerr}// do something with req
resp:=doSomething(req)err=stream.Send(resp)iferr!=nil{returnerr}}}
// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
typeClientStreaminterface{// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header()(metadata.MD,error)// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer()metadata.MD// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend()error// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context()context.Context// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(minterface{})error// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(minterface{})error}
// parser reads complete gRPC messages from the underlying reader.
typeparserstruct{// r is the underlying reader.
// See the comment on recvMsg for the permissible
// error types.
rio.Reader// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header[5]byte}func(p*parser)recvMsg(maxReceiveMessageSizeint)(pfpayloadFormat,msg[]byte,errerror){// p.r 是 http2 stream 的 reader。
if_,err:=p.r.Read(p.header[:]);err!=nil{return0,nil,err}// 第一位记录消息类型
pf=payloadFormat(p.header[0])// 会四位记录消息长度
length:=binary.BigEndian.Uint32(p.header[1:])iflength==0{returnpf,nil,nil}ifint64(length)>int64(maxInt){return0,nil,status.Errorf(codes.ResourceExhausted,"grpc: received message larger than max length allowed on current machine (%d vs. %d)",length,maxInt)}ifint(length)>maxReceiveMessageSize{return0,nil,status.Errorf(codes.ResourceExhausted,"grpc: received message larger than max (%d vs. %d)",length,maxReceiveMessageSize)}// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg=make([]byte,int(length))if_,err:=p.r.Read(msg);err!=nil{iferr==io.EOF{err=io.ErrUnexpectedEOF}return0,nil,err}returnpf,msg,nil}
注意
我们在前面的单向流过程中看到了类似 CloseAndRecv 的方法,而这种带有 close 的方法是由 ClientStream 的 CloseSend 方法来实现的。
而这个方法的实现也相对简单,是在向 http2 transport 写入消息的同时带上一个 option 值来实现的,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func(cs*clientStream)CloseSend()error{ifcs.sentLast{// TODO: return an error and finish the stream instead, due to API misuse?
returnnil}cs.sentLast=trueop:=func(a*csAttempt)error{// 在这里带上一个 Last 标记,表示这是最后一个消息。
a.t.Write(a.s,nil,nil,&transport.Options{Last:true})// Always return nil; io.EOF is the only error that might make sense
// instead, but there is no need to signal the client to call RecvMsg
// as the only use left for the stream after CloseSend is to call
// RecvMsg. This also matches historical behavior.
returnnil}cs.withRetry(op,func(){cs.bufferForRetryLocked(0,op)})// We never returned an error here for reasons.
returnnil}
而 http2 transport 在写入消息时,如果这个标志位是 true,则将这个 stream(这个连接)标记为写入完成的标识,表示不再写新的消息。源码如下:
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func(t*http2Client)Write(s*Stream,hdr[]byte,data[]byte,opts*Options)error{ifopts.Last{// If it's the last message, update stream state.
// 这里!!!
// 如果当前状态是 active 则,将其置位 streamWriteDone, 然后下面写操作继续执行。
if!s.compareAndSwapState(streamActive,streamWriteDone){returnerrStreamDone}// 下次有新消息要写时,会报错
}elseifs.getState()!=streamActive{returnerrStreamDone}df:=&dataFrame{streamID:s.id,endStream:opts.Last,h:hdr,d:data,}ifhdr!=nil||data!=nil{// If it's not an empty data frame, check quota.
iferr:=s.wq.get(int32(len(hdr)+len(data)));err!=nil{returnerr}}// 这个就是上面流程提到的 buffer
returnt.controlBuf.put(df)}
// ServerStream defines the server-side behavior of a streaming RPC.
//
// Errors returned from ServerStream methods are compatible with the status
// package. However, the status code will often not match the RPC status as
// seen by the client application, and therefore, should not be relied upon for
// this purpose.
typeServerStreaminterface{// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
// - ServerStream.SendHeader() is called;
// - The first response is sent out;
// - An RPC status is sent out (error or success).
SetHeader(metadata.MD)error// SendHeader sends the header metadata.
// The provided md and headers set by SetHeader() will be sent.
// It fails if called multiple times.
SendHeader(metadata.MD)error// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)// Context returns the context for this stream.
Context()context.Context// SendMsg sends a message. On error, SendMsg aborts the stream and the
// error is returned directly.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the client. An
// untimely stream closure may result in lost messages.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(minterface{})error// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the client has performed a CloseSend. On
// any non-EOF error, the stream is aborted and the error contains the
// RPC status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(minterface{})error}
至于 MaxConcurrentStreams 的配置(一个连接上的并发 stream 数量),很多文章指出默认是 100,会影响性能,其实不对的。从源码层面来看, http2 server 端支持配置这个参数,但是默认是 0,而该值为 0 的时候,server 端 transport 初始化时做了判断的,如果是 0,则会设置为 math.MaxUint32。
1
2
3
4
5
6
7
8
9
10
11
12
13
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams:=config.MaxStreamsifmaxStreams==0{// 注意看这里!
maxStreams=math.MaxUint32}else{isettings=append(isettings,http2.Setting{// 请记住这个 ID
ID:http2.SettingMaxConcurrentStreams,Val:maxStreams,})}
// http2_client.go:newHTTP2Client()
//
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
got.reader()// http2_client.go:http2Client.reader()
t.handleSettings(sf,true/*isFirst*/)