1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "context"
26 "errors"
27 "fmt"
28 "io"
29 "net"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/keepalive"
38 "google.golang.org/grpc/mem"
39 "google.golang.org/grpc/metadata"
40 "google.golang.org/grpc/peer"
41 "google.golang.org/grpc/stats"
42 "google.golang.org/grpc/status"
43 "google.golang.org/grpc/tap"
44)
45
46const logLevel = 2
47
48// recvMsg represents the received msg from the transport. All transport
49// protocol specific info has been removed.
50type recvMsg struct {
51 buffer mem.Buffer
52 // nil: received some data
53 // io.EOF: stream is completed. data is nil.
54 // other non-nil error: transport failure. data is nil.
55 err error
56}
57
58// recvBuffer is an unbounded channel of recvMsg structs.
59//
60// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
61// holds a channel of recvMsg structs instead of objects implementing "item"
62// interface. recvBuffer is written to much more often and using strict recvMsg
63// structs helps avoid allocation in "recvBuffer.put"
64type recvBuffer struct {
65 c chan recvMsg
66 mu sync.Mutex
67 backlog []recvMsg
68 err error
69}
70
71func newRecvBuffer() *recvBuffer {
72 b := &recvBuffer{
73 c: make(chan recvMsg, 1),
74 }
75 return b
76}
77
78func (b *recvBuffer) put(r recvMsg) {
79 b.mu.Lock()
80 if b.err != nil {
81 // drop the buffer on the floor. Since b.err is not nil, any subsequent reads
82 // will always return an error, making this buffer inaccessible.
83 r.buffer.Free()
84 b.mu.Unlock()
85 // An error had occurred earlier, don't accept more
86 // data or errors.
87 return
88 }
89 b.err = r.err
90 if len(b.backlog) == 0 {
91 select {
92 case b.c <- r:
93 b.mu.Unlock()
94 return
95 default:
96 }
97 }
98 b.backlog = append(b.backlog, r)
99 b.mu.Unlock()
100}
101
102func (b *recvBuffer) load() {
103 b.mu.Lock()
104 if len(b.backlog) > 0 {
105 select {
106 case b.c <- b.backlog[0]:
107 b.backlog[0] = recvMsg{}
108 b.backlog = b.backlog[1:]
109 default:
110 }
111 }
112 b.mu.Unlock()
113}
114
115// get returns the channel that receives a recvMsg in the buffer.
116//
117// Upon receipt of a recvMsg, the caller should call load to send another
118// recvMsg onto the channel if there is any.
119func (b *recvBuffer) get() <-chan recvMsg {
120 return b.c
121}
122
123// recvBufferReader implements io.Reader interface to read the data from
124// recvBuffer.
125type recvBufferReader struct {
126 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
127 ctx context.Context
128 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
129 recv *recvBuffer
130 last mem.Buffer // Stores the remaining data in the previous calls.
131 err error
132}
133
134func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
135 if r.err != nil {
136 return 0, r.err
137 }
138 if r.last != nil {
139 n, r.last = mem.ReadUnsafe(header, r.last)
140 return n, nil
141 }
142 if r.closeStream != nil {
143 n, r.err = r.readMessageHeaderClient(header)
144 } else {
145 n, r.err = r.readMessageHeader(header)
146 }
147 return n, r.err
148}
149
150// Read reads the next n bytes from last. If last is drained, it tries to read
151// additional data from recv. It blocks if there no additional data available in
152// recv. If Read returns any non-nil error, it will continue to return that
153// error.
154func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
155 if r.err != nil {
156 return nil, r.err
157 }
158 if r.last != nil {
159 buf = r.last
160 if r.last.Len() > n {
161 buf, r.last = mem.SplitUnsafe(buf, n)
162 } else {
163 r.last = nil
164 }
165 return buf, nil
166 }
167 if r.closeStream != nil {
168 buf, r.err = r.readClient(n)
169 } else {
170 buf, r.err = r.read(n)
171 }
172 return buf, r.err
173}
174
175func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
176 select {
177 case <-r.ctxDone:
178 return 0, ContextErr(r.ctx.Err())
179 case m := <-r.recv.get():
180 return r.readMessageHeaderAdditional(m, header)
181 }
182}
183
184func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
185 select {
186 case <-r.ctxDone:
187 return nil, ContextErr(r.ctx.Err())
188 case m := <-r.recv.get():
189 return r.readAdditional(m, n)
190 }
191}
192
193func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
194 // If the context is canceled, then closes the stream with nil metadata.
195 // closeStream writes its error parameter to r.recv as a recvMsg.
196 // r.readAdditional acts on that message and returns the necessary error.
197 select {
198 case <-r.ctxDone:
199 // Note that this adds the ctx error to the end of recv buffer, and
200 // reads from the head. This will delay the error until recv buffer is
201 // empty, thus will delay ctx cancellation in Recv().
202 //
203 // It's done this way to fix a race between ctx cancel and trailer. The
204 // race was, stream.Recv() may return ctx error if ctxDone wins the
205 // race, but stream.Trailer() may return a non-nil md because the stream
206 // was not marked as done when trailer is received. This closeStream
207 // call will mark stream as done, thus fix the race.
208 //
209 // TODO: delaying ctx error seems like a unnecessary side effect. What
210 // we really want is to mark the stream as done, and return ctx error
211 // faster.
212 r.closeStream(ContextErr(r.ctx.Err()))
213 m := <-r.recv.get()
214 return r.readMessageHeaderAdditional(m, header)
215 case m := <-r.recv.get():
216 return r.readMessageHeaderAdditional(m, header)
217 }
218}
219
220func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
221 // If the context is canceled, then closes the stream with nil metadata.
222 // closeStream writes its error parameter to r.recv as a recvMsg.
223 // r.readAdditional acts on that message and returns the necessary error.
224 select {
225 case <-r.ctxDone:
226 // Note that this adds the ctx error to the end of recv buffer, and
227 // reads from the head. This will delay the error until recv buffer is
228 // empty, thus will delay ctx cancellation in Recv().
229 //
230 // It's done this way to fix a race between ctx cancel and trailer. The
231 // race was, stream.Recv() may return ctx error if ctxDone wins the
232 // race, but stream.Trailer() may return a non-nil md because the stream
233 // was not marked as done when trailer is received. This closeStream
234 // call will mark stream as done, thus fix the race.
235 //
236 // TODO: delaying ctx error seems like a unnecessary side effect. What
237 // we really want is to mark the stream as done, and return ctx error
238 // faster.
239 r.closeStream(ContextErr(r.ctx.Err()))
240 m := <-r.recv.get()
241 return r.readAdditional(m, n)
242 case m := <-r.recv.get():
243 return r.readAdditional(m, n)
244 }
245}
246
247func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
248 r.recv.load()
249 if m.err != nil {
250 if m.buffer != nil {
251 m.buffer.Free()
252 }
253 return 0, m.err
254 }
255
256 n, r.last = mem.ReadUnsafe(header, m.buffer)
257
258 return n, nil
259}
260
261func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
262 r.recv.load()
263 if m.err != nil {
264 if m.buffer != nil {
265 m.buffer.Free()
266 }
267 return nil, m.err
268 }
269
270 if m.buffer.Len() > n {
271 m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
272 }
273
274 return m.buffer, nil
275}
276
277type streamState uint32
278
279const (
280 streamActive streamState = iota
281 streamWriteDone // EndStream sent
282 streamReadDone // EndStream received
283 streamDone // the entire stream is finished.
284)
285
286// Stream represents an RPC in the transport layer.
287type Stream struct {
288 id uint32
289 ctx context.Context // the associated context of the stream
290 method string // the associated RPC method of the stream
291 recvCompress string
292 sendCompress string
293 buf *recvBuffer
294 trReader *transportReader
295 fc *inFlow
296 wq *writeQuota
297
298 // Callback to state application's intentions to read data. This
299 // is used to adjust flow control, if needed.
300 requestRead func(int)
301
302 state streamState
303
304 // contentSubtype is the content-subtype for requests.
305 // this must be lowercase or the behavior is undefined.
306 contentSubtype string
307
308 trailer metadata.MD // the key-value map of trailer metadata.
309}
310
311func (s *Stream) swapState(st streamState) streamState {
312 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
313}
314
315func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
316 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
317}
318
319func (s *Stream) getState() streamState {
320 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
321}
322
323// Trailer returns the cached trailer metadata. Note that if it is not called
324// after the entire stream is done, it could return an empty MD.
325// It can be safely read only after stream has ended that is either read
326// or write have returned io.EOF.
327func (s *Stream) Trailer() metadata.MD {
328 return s.trailer.Copy()
329}
330
331// Context returns the context of the stream.
332func (s *Stream) Context() context.Context {
333 return s.ctx
334}
335
336// Method returns the method for the stream.
337func (s *Stream) Method() string {
338 return s.method
339}
340
341func (s *Stream) write(m recvMsg) {
342 s.buf.put(m)
343}
344
345// ReadMessageHeader reads data into the provided header slice from the stream.
346// It first checks if there was an error during a previous read operation and
347// returns it if present. It then requests a read operation for the length of
348// the header. It continues to read from the stream until the entire header
349// slice is filled or an error occurs. If an `io.EOF` error is encountered with
350// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
351// unexpected end of the stream. The method returns any error encountered during
352// the read process or nil if the header was successfully read.
353func (s *Stream) ReadMessageHeader(header []byte) (err error) {
354 // Don't request a read if there was an error earlier
355 if er := s.trReader.er; er != nil {
356 return er
357 }
358 s.requestRead(len(header))
359 for len(header) != 0 {
360 n, err := s.trReader.ReadMessageHeader(header)
361 header = header[n:]
362 if len(header) == 0 {
363 err = nil
364 }
365 if err != nil {
366 if n > 0 && err == io.EOF {
367 err = io.ErrUnexpectedEOF
368 }
369 return err
370 }
371 }
372 return nil
373}
374
375// Read reads n bytes from the wire for this stream.
376func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
377 // Don't request a read if there was an error earlier
378 if er := s.trReader.er; er != nil {
379 return nil, er
380 }
381 s.requestRead(n)
382 for n != 0 {
383 buf, err := s.trReader.Read(n)
384 var bufLen int
385 if buf != nil {
386 bufLen = buf.Len()
387 }
388 n -= bufLen
389 if n == 0 {
390 err = nil
391 }
392 if err != nil {
393 if bufLen > 0 && err == io.EOF {
394 err = io.ErrUnexpectedEOF
395 }
396 data.Free()
397 return nil, err
398 }
399 data = append(data, buf)
400 }
401 return data, nil
402}
403
404// transportReader reads all the data available for this Stream from the transport and
405// passes them into the decoder, which converts them into a gRPC message stream.
406// The error is io.EOF when the stream is done or another non-nil error if
407// the stream broke.
408type transportReader struct {
409 reader *recvBufferReader
410 // The handler to control the window update procedure for both this
411 // particular stream and the associated transport.
412 windowHandler func(int)
413 er error
414}
415
416func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
417 n, err := t.reader.ReadMessageHeader(header)
418 if err != nil {
419 t.er = err
420 return 0, err
421 }
422 t.windowHandler(n)
423 return n, nil
424}
425
426func (t *transportReader) Read(n int) (mem.Buffer, error) {
427 buf, err := t.reader.Read(n)
428 if err != nil {
429 t.er = err
430 return buf, err
431 }
432 t.windowHandler(buf.Len())
433 return buf, nil
434}
435
436// GoString is implemented by Stream so context.String() won't
437// race when printing %#v.
438func (s *Stream) GoString() string {
439 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
440}
441
442// state of transport
443type transportState int
444
445const (
446 reachable transportState = iota
447 closing
448 draining
449)
450
451// ServerConfig consists of all the configurations to establish a server transport.
452type ServerConfig struct {
453 MaxStreams uint32
454 ConnectionTimeout time.Duration
455 Credentials credentials.TransportCredentials
456 InTapHandle tap.ServerInHandle
457 StatsHandlers []stats.Handler
458 KeepaliveParams keepalive.ServerParameters
459 KeepalivePolicy keepalive.EnforcementPolicy
460 InitialWindowSize int32
461 InitialConnWindowSize int32
462 WriteBufferSize int
463 ReadBufferSize int
464 SharedWriteBuffer bool
465 ChannelzParent *channelz.Server
466 MaxHeaderListSize *uint32
467 HeaderTableSize *uint32
468 BufferPool mem.BufferPool
469}
470
471// ConnectOptions covers all relevant options for communicating with the server.
472type ConnectOptions struct {
473 // UserAgent is the application user agent.
474 UserAgent string
475 // Dialer specifies how to dial a network address.
476 Dialer func(context.Context, string) (net.Conn, error)
477 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
478 FailOnNonTempDialError bool
479 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
480 PerRPCCredentials []credentials.PerRPCCredentials
481 // TransportCredentials stores the Authenticator required to setup a client
482 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
483 TransportCredentials credentials.TransportCredentials
484 // CredsBundle is the credentials bundle to be used. Only one of
485 // TransportCredentials and CredsBundle is non-nil.
486 CredsBundle credentials.Bundle
487 // KeepaliveParams stores the keepalive parameters.
488 KeepaliveParams keepalive.ClientParameters
489 // StatsHandlers stores the handler for stats.
490 StatsHandlers []stats.Handler
491 // InitialWindowSize sets the initial window size for a stream.
492 InitialWindowSize int32
493 // InitialConnWindowSize sets the initial window size for a connection.
494 InitialConnWindowSize int32
495 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
496 WriteBufferSize int
497 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
498 ReadBufferSize int
499 // SharedWriteBuffer indicates whether connections should reuse write buffer
500 SharedWriteBuffer bool
501 // ChannelzParent sets the addrConn id which initiated the creation of this client transport.
502 ChannelzParent *channelz.SubChannel
503 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
504 MaxHeaderListSize *uint32
505 // The mem.BufferPool to use when reading/writing to the wire.
506 BufferPool mem.BufferPool
507}
508
509// WriteOptions provides additional hints and information for message
510// transmission.
511type WriteOptions struct {
512 // Last indicates whether this write is the last piece for
513 // this stream.
514 Last bool
515}
516
517// CallHdr carries the information of a particular RPC.
518type CallHdr struct {
519 // Host specifies the peer's host.
520 Host string
521
522 // Method specifies the operation to perform.
523 Method string
524
525 // SendCompress specifies the compression algorithm applied on
526 // outbound message.
527 SendCompress string
528
529 // Creds specifies credentials.PerRPCCredentials for a call.
530 Creds credentials.PerRPCCredentials
531
532 // ContentSubtype specifies the content-subtype for a request. For example, a
533 // content-subtype of "proto" will result in a content-type of
534 // "application/grpc+proto". The value of ContentSubtype must be all
535 // lowercase, otherwise the behavior is undefined. See
536 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
537 // for more details.
538 ContentSubtype string
539
540 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
541
542 DoneFunc func() // called when the stream is finished
543}
544
545// ClientTransport is the common interface for all gRPC client-side transport
546// implementations.
547type ClientTransport interface {
548 // Close tears down this transport. Once it returns, the transport
549 // should not be accessed any more. The caller must make sure this
550 // is called only once.
551 Close(err error)
552
553 // GracefulClose starts to tear down the transport: the transport will stop
554 // accepting new RPCs and NewStream will return error. Once all streams are
555 // finished, the transport will close.
556 //
557 // It does not block.
558 GracefulClose()
559
560 // NewStream creates a Stream for an RPC.
561 NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
562
563 // Error returns a channel that is closed when some I/O error
564 // happens. Typically the caller should have a goroutine to monitor
565 // this in order to take action (e.g., close the current transport
566 // and create a new one) in error case. It should not return nil
567 // once the transport is initiated.
568 Error() <-chan struct{}
569
570 // GoAway returns a channel that is closed when ClientTransport
571 // receives the draining signal from the server (e.g., GOAWAY frame in
572 // HTTP/2).
573 GoAway() <-chan struct{}
574
575 // GetGoAwayReason returns the reason why GoAway frame was received, along
576 // with a human readable string with debug info.
577 GetGoAwayReason() (GoAwayReason, string)
578
579 // RemoteAddr returns the remote network address.
580 RemoteAddr() net.Addr
581}
582
583// ServerTransport is the common interface for all gRPC server-side transport
584// implementations.
585//
586// Methods may be called concurrently from multiple goroutines, but
587// Write methods for a given Stream will be called serially.
588type ServerTransport interface {
589 // HandleStreams receives incoming streams using the given handler.
590 HandleStreams(context.Context, func(*ServerStream))
591
592 // Close tears down the transport. Once it is called, the transport
593 // should not be accessed any more. All the pending streams and their
594 // handlers will be terminated asynchronously.
595 Close(err error)
596
597 // Peer returns the peer of the server transport.
598 Peer() *peer.Peer
599
600 // Drain notifies the client this ServerTransport stops accepting new RPCs.
601 Drain(debugData string)
602}
603
604type internalServerTransport interface {
605 ServerTransport
606 writeHeader(s *ServerStream, md metadata.MD) error
607 write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
608 writeStatus(s *ServerStream, st *status.Status) error
609 incrMsgRecv()
610}
611
612// connectionErrorf creates an ConnectionError with the specified error description.
613func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
614 return ConnectionError{
615 Desc: fmt.Sprintf(format, a...),
616 temp: temp,
617 err: e,
618 }
619}
620
621// ConnectionError is an error that results in the termination of the
622// entire connection and the retry of all the active streams.
623type ConnectionError struct {
624 Desc string
625 temp bool
626 err error
627}
628
629func (e ConnectionError) Error() string {
630 return fmt.Sprintf("connection error: desc = %q", e.Desc)
631}
632
633// Temporary indicates if this connection error is temporary or fatal.
634func (e ConnectionError) Temporary() bool {
635 return e.temp
636}
637
638// Origin returns the original error of this connection error.
639func (e ConnectionError) Origin() error {
640 // Never return nil error here.
641 // If the original error is nil, return itself.
642 if e.err == nil {
643 return e
644 }
645 return e.err
646}
647
648// Unwrap returns the original error of this connection error or nil when the
649// origin is nil.
650func (e ConnectionError) Unwrap() error {
651 return e.err
652}
653
654var (
655 // ErrConnClosing indicates that the transport is closing.
656 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
657 // errStreamDrain indicates that the stream is rejected because the
658 // connection is draining. This could be caused by goaway or balancer
659 // removing the address.
660 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
661 // errStreamDone is returned from write at the client side to indicate application
662 // layer of an error.
663 errStreamDone = errors.New("the stream is done")
664 // StatusGoAway indicates that the server sent a GOAWAY that included this
665 // stream's ID in unprocessed RPCs.
666 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
667)
668
669// GoAwayReason contains the reason for the GoAway frame received.
670type GoAwayReason uint8
671
672const (
673 // GoAwayInvalid indicates that no GoAway frame is received.
674 GoAwayInvalid GoAwayReason = 0
675 // GoAwayNoReason is the default value when GoAway frame is received.
676 GoAwayNoReason GoAwayReason = 1
677 // GoAwayTooManyPings indicates that a GoAway frame with
678 // ErrCodeEnhanceYourCalm was received and that the debug data said
679 // "too_many_pings".
680 GoAwayTooManyPings GoAwayReason = 2
681)
682
683// ContextErr converts the error from context package into a status error.
684func ContextErr(err error) error {
685 switch err {
686 case context.DeadlineExceeded:
687 return status.Error(codes.DeadlineExceeded, err.Error())
688 case context.Canceled:
689 return status.Error(codes.Canceled, err.Error())
690 }
691 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
692}