transport.go

  1/*
  2 *
  3 * Copyright 2014 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19// Package transport defines and implements message oriented communication
 20// channel to complete various transactions (e.g., an RPC).  It is meant for
 21// grpc-internal usage and is not intended to be imported directly by users.
 22package transport
 23
 24import (
 25	"context"
 26	"errors"
 27	"fmt"
 28	"io"
 29	"net"
 30	"sync"
 31	"sync/atomic"
 32	"time"
 33
 34	"google.golang.org/grpc/codes"
 35	"google.golang.org/grpc/credentials"
 36	"google.golang.org/grpc/internal/channelz"
 37	"google.golang.org/grpc/keepalive"
 38	"google.golang.org/grpc/mem"
 39	"google.golang.org/grpc/metadata"
 40	"google.golang.org/grpc/peer"
 41	"google.golang.org/grpc/stats"
 42	"google.golang.org/grpc/status"
 43	"google.golang.org/grpc/tap"
 44)
 45
 46const logLevel = 2
 47
 48// recvMsg represents the received msg from the transport. All transport
 49// protocol specific info has been removed.
 50type recvMsg struct {
 51	buffer mem.Buffer
 52	// nil: received some data
 53	// io.EOF: stream is completed. data is nil.
 54	// other non-nil error: transport failure. data is nil.
 55	err error
 56}
 57
 58// recvBuffer is an unbounded channel of recvMsg structs.
 59//
 60// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
 61// holds a channel of recvMsg structs instead of objects implementing "item"
 62// interface. recvBuffer is written to much more often and using strict recvMsg
 63// structs helps avoid allocation in "recvBuffer.put"
 64type recvBuffer struct {
 65	c       chan recvMsg
 66	mu      sync.Mutex
 67	backlog []recvMsg
 68	err     error
 69}
 70
 71func newRecvBuffer() *recvBuffer {
 72	b := &recvBuffer{
 73		c: make(chan recvMsg, 1),
 74	}
 75	return b
 76}
 77
 78func (b *recvBuffer) put(r recvMsg) {
 79	b.mu.Lock()
 80	if b.err != nil {
 81		// drop the buffer on the floor. Since b.err is not nil, any subsequent reads
 82		// will always return an error, making this buffer inaccessible.
 83		r.buffer.Free()
 84		b.mu.Unlock()
 85		// An error had occurred earlier, don't accept more
 86		// data or errors.
 87		return
 88	}
 89	b.err = r.err
 90	if len(b.backlog) == 0 {
 91		select {
 92		case b.c <- r:
 93			b.mu.Unlock()
 94			return
 95		default:
 96		}
 97	}
 98	b.backlog = append(b.backlog, r)
 99	b.mu.Unlock()
100}
101
102func (b *recvBuffer) load() {
103	b.mu.Lock()
104	if len(b.backlog) > 0 {
105		select {
106		case b.c <- b.backlog[0]:
107			b.backlog[0] = recvMsg{}
108			b.backlog = b.backlog[1:]
109		default:
110		}
111	}
112	b.mu.Unlock()
113}
114
115// get returns the channel that receives a recvMsg in the buffer.
116//
117// Upon receipt of a recvMsg, the caller should call load to send another
118// recvMsg onto the channel if there is any.
119func (b *recvBuffer) get() <-chan recvMsg {
120	return b.c
121}
122
123// recvBufferReader implements io.Reader interface to read the data from
124// recvBuffer.
125type recvBufferReader struct {
126	closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
127	ctx         context.Context
128	ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
129	recv        *recvBuffer
130	last        mem.Buffer // Stores the remaining data in the previous calls.
131	err         error
132}
133
134func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
135	if r.err != nil {
136		return 0, r.err
137	}
138	if r.last != nil {
139		n, r.last = mem.ReadUnsafe(header, r.last)
140		return n, nil
141	}
142	if r.closeStream != nil {
143		n, r.err = r.readMessageHeaderClient(header)
144	} else {
145		n, r.err = r.readMessageHeader(header)
146	}
147	return n, r.err
148}
149
150// Read reads the next n bytes from last. If last is drained, it tries to read
151// additional data from recv. It blocks if there no additional data available in
152// recv. If Read returns any non-nil error, it will continue to return that
153// error.
154func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
155	if r.err != nil {
156		return nil, r.err
157	}
158	if r.last != nil {
159		buf = r.last
160		if r.last.Len() > n {
161			buf, r.last = mem.SplitUnsafe(buf, n)
162		} else {
163			r.last = nil
164		}
165		return buf, nil
166	}
167	if r.closeStream != nil {
168		buf, r.err = r.readClient(n)
169	} else {
170		buf, r.err = r.read(n)
171	}
172	return buf, r.err
173}
174
175func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
176	select {
177	case <-r.ctxDone:
178		return 0, ContextErr(r.ctx.Err())
179	case m := <-r.recv.get():
180		return r.readMessageHeaderAdditional(m, header)
181	}
182}
183
184func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
185	select {
186	case <-r.ctxDone:
187		return nil, ContextErr(r.ctx.Err())
188	case m := <-r.recv.get():
189		return r.readAdditional(m, n)
190	}
191}
192
193func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
194	// If the context is canceled, then closes the stream with nil metadata.
195	// closeStream writes its error parameter to r.recv as a recvMsg.
196	// r.readAdditional acts on that message and returns the necessary error.
197	select {
198	case <-r.ctxDone:
199		// Note that this adds the ctx error to the end of recv buffer, and
200		// reads from the head. This will delay the error until recv buffer is
201		// empty, thus will delay ctx cancellation in Recv().
202		//
203		// It's done this way to fix a race between ctx cancel and trailer. The
204		// race was, stream.Recv() may return ctx error if ctxDone wins the
205		// race, but stream.Trailer() may return a non-nil md because the stream
206		// was not marked as done when trailer is received. This closeStream
207		// call will mark stream as done, thus fix the race.
208		//
209		// TODO: delaying ctx error seems like a unnecessary side effect. What
210		// we really want is to mark the stream as done, and return ctx error
211		// faster.
212		r.closeStream(ContextErr(r.ctx.Err()))
213		m := <-r.recv.get()
214		return r.readMessageHeaderAdditional(m, header)
215	case m := <-r.recv.get():
216		return r.readMessageHeaderAdditional(m, header)
217	}
218}
219
220func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
221	// If the context is canceled, then closes the stream with nil metadata.
222	// closeStream writes its error parameter to r.recv as a recvMsg.
223	// r.readAdditional acts on that message and returns the necessary error.
224	select {
225	case <-r.ctxDone:
226		// Note that this adds the ctx error to the end of recv buffer, and
227		// reads from the head. This will delay the error until recv buffer is
228		// empty, thus will delay ctx cancellation in Recv().
229		//
230		// It's done this way to fix a race between ctx cancel and trailer. The
231		// race was, stream.Recv() may return ctx error if ctxDone wins the
232		// race, but stream.Trailer() may return a non-nil md because the stream
233		// was not marked as done when trailer is received. This closeStream
234		// call will mark stream as done, thus fix the race.
235		//
236		// TODO: delaying ctx error seems like a unnecessary side effect. What
237		// we really want is to mark the stream as done, and return ctx error
238		// faster.
239		r.closeStream(ContextErr(r.ctx.Err()))
240		m := <-r.recv.get()
241		return r.readAdditional(m, n)
242	case m := <-r.recv.get():
243		return r.readAdditional(m, n)
244	}
245}
246
247func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
248	r.recv.load()
249	if m.err != nil {
250		if m.buffer != nil {
251			m.buffer.Free()
252		}
253		return 0, m.err
254	}
255
256	n, r.last = mem.ReadUnsafe(header, m.buffer)
257
258	return n, nil
259}
260
261func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
262	r.recv.load()
263	if m.err != nil {
264		if m.buffer != nil {
265			m.buffer.Free()
266		}
267		return nil, m.err
268	}
269
270	if m.buffer.Len() > n {
271		m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
272	}
273
274	return m.buffer, nil
275}
276
277type streamState uint32
278
279const (
280	streamActive    streamState = iota
281	streamWriteDone             // EndStream sent
282	streamReadDone              // EndStream received
283	streamDone                  // the entire stream is finished.
284)
285
286// Stream represents an RPC in the transport layer.
287type Stream struct {
288	id           uint32
289	ctx          context.Context // the associated context of the stream
290	method       string          // the associated RPC method of the stream
291	recvCompress string
292	sendCompress string
293	buf          *recvBuffer
294	trReader     *transportReader
295	fc           *inFlow
296	wq           *writeQuota
297
298	// Callback to state application's intentions to read data. This
299	// is used to adjust flow control, if needed.
300	requestRead func(int)
301
302	state streamState
303
304	// contentSubtype is the content-subtype for requests.
305	// this must be lowercase or the behavior is undefined.
306	contentSubtype string
307
308	trailer metadata.MD // the key-value map of trailer metadata.
309}
310
311func (s *Stream) swapState(st streamState) streamState {
312	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
313}
314
315func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
316	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
317}
318
319func (s *Stream) getState() streamState {
320	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
321}
322
323// Trailer returns the cached trailer metadata. Note that if it is not called
324// after the entire stream is done, it could return an empty MD.
325// It can be safely read only after stream has ended that is either read
326// or write have returned io.EOF.
327func (s *Stream) Trailer() metadata.MD {
328	return s.trailer.Copy()
329}
330
331// Context returns the context of the stream.
332func (s *Stream) Context() context.Context {
333	return s.ctx
334}
335
336// Method returns the method for the stream.
337func (s *Stream) Method() string {
338	return s.method
339}
340
341func (s *Stream) write(m recvMsg) {
342	s.buf.put(m)
343}
344
345// ReadMessageHeader reads data into the provided header slice from the stream.
346// It first checks if there was an error during a previous read operation and
347// returns it if present. It then requests a read operation for the length of
348// the header. It continues to read from the stream until the entire header
349// slice is filled or an error occurs. If an `io.EOF` error is encountered with
350// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
351// unexpected end of the stream. The method returns any error encountered during
352// the read process or nil if the header was successfully read.
353func (s *Stream) ReadMessageHeader(header []byte) (err error) {
354	// Don't request a read if there was an error earlier
355	if er := s.trReader.er; er != nil {
356		return er
357	}
358	s.requestRead(len(header))
359	for len(header) != 0 {
360		n, err := s.trReader.ReadMessageHeader(header)
361		header = header[n:]
362		if len(header) == 0 {
363			err = nil
364		}
365		if err != nil {
366			if n > 0 && err == io.EOF {
367				err = io.ErrUnexpectedEOF
368			}
369			return err
370		}
371	}
372	return nil
373}
374
375// Read reads n bytes from the wire for this stream.
376func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
377	// Don't request a read if there was an error earlier
378	if er := s.trReader.er; er != nil {
379		return nil, er
380	}
381	s.requestRead(n)
382	for n != 0 {
383		buf, err := s.trReader.Read(n)
384		var bufLen int
385		if buf != nil {
386			bufLen = buf.Len()
387		}
388		n -= bufLen
389		if n == 0 {
390			err = nil
391		}
392		if err != nil {
393			if bufLen > 0 && err == io.EOF {
394				err = io.ErrUnexpectedEOF
395			}
396			data.Free()
397			return nil, err
398		}
399		data = append(data, buf)
400	}
401	return data, nil
402}
403
404// transportReader reads all the data available for this Stream from the transport and
405// passes them into the decoder, which converts them into a gRPC message stream.
406// The error is io.EOF when the stream is done or another non-nil error if
407// the stream broke.
408type transportReader struct {
409	reader *recvBufferReader
410	// The handler to control the window update procedure for both this
411	// particular stream and the associated transport.
412	windowHandler func(int)
413	er            error
414}
415
416func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
417	n, err := t.reader.ReadMessageHeader(header)
418	if err != nil {
419		t.er = err
420		return 0, err
421	}
422	t.windowHandler(n)
423	return n, nil
424}
425
426func (t *transportReader) Read(n int) (mem.Buffer, error) {
427	buf, err := t.reader.Read(n)
428	if err != nil {
429		t.er = err
430		return buf, err
431	}
432	t.windowHandler(buf.Len())
433	return buf, nil
434}
435
436// GoString is implemented by Stream so context.String() won't
437// race when printing %#v.
438func (s *Stream) GoString() string {
439	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
440}
441
442// state of transport
443type transportState int
444
445const (
446	reachable transportState = iota
447	closing
448	draining
449)
450
451// ServerConfig consists of all the configurations to establish a server transport.
452type ServerConfig struct {
453	MaxStreams            uint32
454	ConnectionTimeout     time.Duration
455	Credentials           credentials.TransportCredentials
456	InTapHandle           tap.ServerInHandle
457	StatsHandlers         []stats.Handler
458	KeepaliveParams       keepalive.ServerParameters
459	KeepalivePolicy       keepalive.EnforcementPolicy
460	InitialWindowSize     int32
461	InitialConnWindowSize int32
462	WriteBufferSize       int
463	ReadBufferSize        int
464	SharedWriteBuffer     bool
465	ChannelzParent        *channelz.Server
466	MaxHeaderListSize     *uint32
467	HeaderTableSize       *uint32
468	BufferPool            mem.BufferPool
469}
470
471// ConnectOptions covers all relevant options for communicating with the server.
472type ConnectOptions struct {
473	// UserAgent is the application user agent.
474	UserAgent string
475	// Dialer specifies how to dial a network address.
476	Dialer func(context.Context, string) (net.Conn, error)
477	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
478	FailOnNonTempDialError bool
479	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
480	PerRPCCredentials []credentials.PerRPCCredentials
481	// TransportCredentials stores the Authenticator required to setup a client
482	// connection. Only one of TransportCredentials and CredsBundle is non-nil.
483	TransportCredentials credentials.TransportCredentials
484	// CredsBundle is the credentials bundle to be used. Only one of
485	// TransportCredentials and CredsBundle is non-nil.
486	CredsBundle credentials.Bundle
487	// KeepaliveParams stores the keepalive parameters.
488	KeepaliveParams keepalive.ClientParameters
489	// StatsHandlers stores the handler for stats.
490	StatsHandlers []stats.Handler
491	// InitialWindowSize sets the initial window size for a stream.
492	InitialWindowSize int32
493	// InitialConnWindowSize sets the initial window size for a connection.
494	InitialConnWindowSize int32
495	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
496	WriteBufferSize int
497	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
498	ReadBufferSize int
499	// SharedWriteBuffer indicates whether connections should reuse write buffer
500	SharedWriteBuffer bool
501	// ChannelzParent sets the addrConn id which initiated the creation of this client transport.
502	ChannelzParent *channelz.SubChannel
503	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
504	MaxHeaderListSize *uint32
505	// The mem.BufferPool to use when reading/writing to the wire.
506	BufferPool mem.BufferPool
507}
508
509// WriteOptions provides additional hints and information for message
510// transmission.
511type WriteOptions struct {
512	// Last indicates whether this write is the last piece for
513	// this stream.
514	Last bool
515}
516
517// CallHdr carries the information of a particular RPC.
518type CallHdr struct {
519	// Host specifies the peer's host.
520	Host string
521
522	// Method specifies the operation to perform.
523	Method string
524
525	// SendCompress specifies the compression algorithm applied on
526	// outbound message.
527	SendCompress string
528
529	// Creds specifies credentials.PerRPCCredentials for a call.
530	Creds credentials.PerRPCCredentials
531
532	// ContentSubtype specifies the content-subtype for a request. For example, a
533	// content-subtype of "proto" will result in a content-type of
534	// "application/grpc+proto". The value of ContentSubtype must be all
535	// lowercase, otherwise the behavior is undefined. See
536	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
537	// for more details.
538	ContentSubtype string
539
540	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
541
542	DoneFunc func() // called when the stream is finished
543}
544
545// ClientTransport is the common interface for all gRPC client-side transport
546// implementations.
547type ClientTransport interface {
548	// Close tears down this transport. Once it returns, the transport
549	// should not be accessed any more. The caller must make sure this
550	// is called only once.
551	Close(err error)
552
553	// GracefulClose starts to tear down the transport: the transport will stop
554	// accepting new RPCs and NewStream will return error. Once all streams are
555	// finished, the transport will close.
556	//
557	// It does not block.
558	GracefulClose()
559
560	// NewStream creates a Stream for an RPC.
561	NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
562
563	// Error returns a channel that is closed when some I/O error
564	// happens. Typically the caller should have a goroutine to monitor
565	// this in order to take action (e.g., close the current transport
566	// and create a new one) in error case. It should not return nil
567	// once the transport is initiated.
568	Error() <-chan struct{}
569
570	// GoAway returns a channel that is closed when ClientTransport
571	// receives the draining signal from the server (e.g., GOAWAY frame in
572	// HTTP/2).
573	GoAway() <-chan struct{}
574
575	// GetGoAwayReason returns the reason why GoAway frame was received, along
576	// with a human readable string with debug info.
577	GetGoAwayReason() (GoAwayReason, string)
578
579	// RemoteAddr returns the remote network address.
580	RemoteAddr() net.Addr
581}
582
583// ServerTransport is the common interface for all gRPC server-side transport
584// implementations.
585//
586// Methods may be called concurrently from multiple goroutines, but
587// Write methods for a given Stream will be called serially.
588type ServerTransport interface {
589	// HandleStreams receives incoming streams using the given handler.
590	HandleStreams(context.Context, func(*ServerStream))
591
592	// Close tears down the transport. Once it is called, the transport
593	// should not be accessed any more. All the pending streams and their
594	// handlers will be terminated asynchronously.
595	Close(err error)
596
597	// Peer returns the peer of the server transport.
598	Peer() *peer.Peer
599
600	// Drain notifies the client this ServerTransport stops accepting new RPCs.
601	Drain(debugData string)
602}
603
604type internalServerTransport interface {
605	ServerTransport
606	writeHeader(s *ServerStream, md metadata.MD) error
607	write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
608	writeStatus(s *ServerStream, st *status.Status) error
609	incrMsgRecv()
610}
611
612// connectionErrorf creates an ConnectionError with the specified error description.
613func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
614	return ConnectionError{
615		Desc: fmt.Sprintf(format, a...),
616		temp: temp,
617		err:  e,
618	}
619}
620
621// ConnectionError is an error that results in the termination of the
622// entire connection and the retry of all the active streams.
623type ConnectionError struct {
624	Desc string
625	temp bool
626	err  error
627}
628
629func (e ConnectionError) Error() string {
630	return fmt.Sprintf("connection error: desc = %q", e.Desc)
631}
632
633// Temporary indicates if this connection error is temporary or fatal.
634func (e ConnectionError) Temporary() bool {
635	return e.temp
636}
637
638// Origin returns the original error of this connection error.
639func (e ConnectionError) Origin() error {
640	// Never return nil error here.
641	// If the original error is nil, return itself.
642	if e.err == nil {
643		return e
644	}
645	return e.err
646}
647
648// Unwrap returns the original error of this connection error or nil when the
649// origin is nil.
650func (e ConnectionError) Unwrap() error {
651	return e.err
652}
653
654var (
655	// ErrConnClosing indicates that the transport is closing.
656	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
657	// errStreamDrain indicates that the stream is rejected because the
658	// connection is draining. This could be caused by goaway or balancer
659	// removing the address.
660	errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
661	// errStreamDone is returned from write at the client side to indicate application
662	// layer of an error.
663	errStreamDone = errors.New("the stream is done")
664	// StatusGoAway indicates that the server sent a GOAWAY that included this
665	// stream's ID in unprocessed RPCs.
666	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
667)
668
669// GoAwayReason contains the reason for the GoAway frame received.
670type GoAwayReason uint8
671
672const (
673	// GoAwayInvalid indicates that no GoAway frame is received.
674	GoAwayInvalid GoAwayReason = 0
675	// GoAwayNoReason is the default value when GoAway frame is received.
676	GoAwayNoReason GoAwayReason = 1
677	// GoAwayTooManyPings indicates that a GoAway frame with
678	// ErrCodeEnhanceYourCalm was received and that the debug data said
679	// "too_many_pings".
680	GoAwayTooManyPings GoAwayReason = 2
681)
682
683// ContextErr converts the error from context package into a status error.
684func ContextErr(err error) error {
685	switch err {
686	case context.DeadlineExceeded:
687		return status.Error(codes.DeadlineExceeded, err.Error())
688	case context.Canceled:
689		return status.Error(codes.Canceled, err.Error())
690	}
691	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
692}