stream.go

   1/*
   2 *
   3 * Copyright 2014 gRPC authors.
   4 *
   5 * Licensed under the Apache License, Version 2.0 (the "License");
   6 * you may not use this file except in compliance with the License.
   7 * You may obtain a copy of the License at
   8 *
   9 *     http://www.apache.org/licenses/LICENSE-2.0
  10 *
  11 * Unless required by applicable law or agreed to in writing, software
  12 * distributed under the License is distributed on an "AS IS" BASIS,
  13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 * See the License for the specific language governing permissions and
  15 * limitations under the License.
  16 *
  17 */
  18
  19package grpc
  20
  21import (
  22	"context"
  23	"errors"
  24	"io"
  25	"math"
  26	rand "math/rand/v2"
  27	"strconv"
  28	"sync"
  29	"time"
  30
  31	"google.golang.org/grpc/balancer"
  32	"google.golang.org/grpc/codes"
  33	"google.golang.org/grpc/encoding"
  34	"google.golang.org/grpc/internal"
  35	"google.golang.org/grpc/internal/balancerload"
  36	"google.golang.org/grpc/internal/binarylog"
  37	"google.golang.org/grpc/internal/channelz"
  38	"google.golang.org/grpc/internal/grpcutil"
  39	imetadata "google.golang.org/grpc/internal/metadata"
  40	iresolver "google.golang.org/grpc/internal/resolver"
  41	"google.golang.org/grpc/internal/serviceconfig"
  42	istatus "google.golang.org/grpc/internal/status"
  43	"google.golang.org/grpc/internal/transport"
  44	"google.golang.org/grpc/mem"
  45	"google.golang.org/grpc/metadata"
  46	"google.golang.org/grpc/peer"
  47	"google.golang.org/grpc/stats"
  48	"google.golang.org/grpc/status"
  49)
  50
  51var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
  52
  53// StreamHandler defines the handler called by gRPC server to complete the
  54// execution of a streaming RPC.
  55//
  56// If a StreamHandler returns an error, it should either be produced by the
  57// status package, or be one of the context errors. Otherwise, gRPC will use
  58// codes.Unknown as the status code and err.Error() as the status message of the
  59// RPC.
  60type StreamHandler func(srv any, stream ServerStream) error
  61
  62// StreamDesc represents a streaming RPC service's method specification.  Used
  63// on the server when registering services and on the client when initiating
  64// new streams.
  65type StreamDesc struct {
  66	// StreamName and Handler are only used when registering handlers on a
  67	// server.
  68	StreamName string        // the name of the method excluding the service
  69	Handler    StreamHandler // the handler called for the method
  70
  71	// ServerStreams and ClientStreams are used for registering handlers on a
  72	// server as well as defining RPC behavior when passed to NewClientStream
  73	// and ClientConn.NewStream.  At least one must be true.
  74	ServerStreams bool // indicates the server can perform streaming sends
  75	ClientStreams bool // indicates the client can perform streaming sends
  76}
  77
  78// Stream defines the common interface a client or server stream has to satisfy.
  79//
  80// Deprecated: See ClientStream and ServerStream documentation instead.
  81type Stream interface {
  82	// Deprecated: See ClientStream and ServerStream documentation instead.
  83	Context() context.Context
  84	// Deprecated: See ClientStream and ServerStream documentation instead.
  85	SendMsg(m any) error
  86	// Deprecated: See ClientStream and ServerStream documentation instead.
  87	RecvMsg(m any) error
  88}
  89
  90// ClientStream defines the client-side behavior of a streaming RPC.
  91//
  92// All errors returned from ClientStream methods are compatible with the
  93// status package.
  94type ClientStream interface {
  95	// Header returns the header metadata received from the server if there
  96	// is any. It blocks if the metadata is not ready to read.  If the metadata
  97	// is nil and the error is also nil, then the stream was terminated without
  98	// headers, and the status can be discovered by calling RecvMsg.
  99	Header() (metadata.MD, error)
 100	// Trailer returns the trailer metadata from the server, if there is any.
 101	// It must only be called after stream.CloseAndRecv has returned, or
 102	// stream.Recv has returned a non-nil error (including io.EOF).
 103	Trailer() metadata.MD
 104	// CloseSend closes the send direction of the stream. It closes the stream
 105	// when non-nil error is met. It is also not safe to call CloseSend
 106	// concurrently with SendMsg.
 107	CloseSend() error
 108	// Context returns the context for this stream.
 109	//
 110	// It should not be called until after Header or RecvMsg has returned. Once
 111	// called, subsequent client-side retries are disabled.
 112	Context() context.Context
 113	// SendMsg is generally called by generated code. On error, SendMsg aborts
 114	// the stream. If the error was generated by the client, the status is
 115	// returned directly; otherwise, io.EOF is returned and the status of
 116	// the stream may be discovered using RecvMsg. For unary or server-streaming
 117	// RPCs (StreamDesc.ClientStreams is false), a nil error is returned
 118	// unconditionally.
 119	//
 120	// SendMsg blocks until:
 121	//   - There is sufficient flow control to schedule m with the transport, or
 122	//   - The stream is done, or
 123	//   - The stream breaks.
 124	//
 125	// SendMsg does not wait until the message is received by the server. An
 126	// untimely stream closure may result in lost messages. To ensure delivery,
 127	// users should ensure the RPC completed successfully using RecvMsg.
 128	//
 129	// It is safe to have a goroutine calling SendMsg and another goroutine
 130	// calling RecvMsg on the same stream at the same time, but it is not safe
 131	// to call SendMsg on the same stream in different goroutines. It is also
 132	// not safe to call CloseSend concurrently with SendMsg.
 133	//
 134	// It is not safe to modify the message after calling SendMsg. Tracing
 135	// libraries and stats handlers may use the message lazily.
 136	SendMsg(m any) error
 137	// RecvMsg blocks until it receives a message into m or the stream is
 138	// done. It returns io.EOF when the stream completes successfully. On
 139	// any other error, the stream is aborted and the error contains the RPC
 140	// status.
 141	//
 142	// It is safe to have a goroutine calling SendMsg and another goroutine
 143	// calling RecvMsg on the same stream at the same time, but it is not
 144	// safe to call RecvMsg on the same stream in different goroutines.
 145	RecvMsg(m any) error
 146}
 147
 148// NewStream creates a new Stream for the client side. This is typically
 149// called by generated code. ctx is used for the lifetime of the stream.
 150//
 151// To ensure resources are not leaked due to the stream returned, one of the following
 152// actions must be performed:
 153//
 154//  1. Call Close on the ClientConn.
 155//  2. Cancel the context provided.
 156//  3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
 157//     client-streaming RPC, for instance, might use the helper function
 158//     CloseAndRecv (note that CloseSend does not Recv, therefore is not
 159//     guaranteed to release all resources).
 160//  4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
 161//
 162// If none of the above happen, a goroutine and a context will be leaked, and grpc
 163// will not call the optionally-configured stats handler with a stats.End message.
 164func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
 165	// allow interceptor to see all applicable call options, which means those
 166	// configured as defaults from dial option as well as per-call options
 167	opts = combine(cc.dopts.callOptions, opts)
 168
 169	if cc.dopts.streamInt != nil {
 170		return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
 171	}
 172	return newClientStream(ctx, desc, cc, method, opts...)
 173}
 174
 175// NewClientStream is a wrapper for ClientConn.NewStream.
 176func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
 177	return cc.NewStream(ctx, desc, method, opts...)
 178}
 179
 180func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
 181	// Start tracking the RPC for idleness purposes. This is where a stream is
 182	// created for both streaming and unary RPCs, and hence is a good place to
 183	// track active RPC count.
 184	if err := cc.idlenessMgr.OnCallBegin(); err != nil {
 185		return nil, err
 186	}
 187	// Add a calloption, to decrement the active call count, that gets executed
 188	// when the RPC completes.
 189	opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
 190
 191	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
 192		// validate md
 193		if err := imetadata.Validate(md); err != nil {
 194			return nil, status.Error(codes.Internal, err.Error())
 195		}
 196		// validate added
 197		for _, kvs := range added {
 198			for i := 0; i < len(kvs); i += 2 {
 199				if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
 200					return nil, status.Error(codes.Internal, err.Error())
 201				}
 202			}
 203		}
 204	}
 205	if channelz.IsOn() {
 206		cc.incrCallsStarted()
 207		defer func() {
 208			if err != nil {
 209				cc.incrCallsFailed()
 210			}
 211		}()
 212	}
 213	// Provide an opportunity for the first RPC to see the first service config
 214	// provided by the resolver.
 215	if err := cc.waitForResolvedAddrs(ctx); err != nil {
 216		return nil, err
 217	}
 218
 219	var mc serviceconfig.MethodConfig
 220	var onCommit func()
 221	newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 222		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
 223	}
 224
 225	rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
 226	rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
 227	if err != nil {
 228		if st, ok := status.FromError(err); ok {
 229			// Restrict the code to the list allowed by gRFC A54.
 230			if istatus.IsRestrictedControlPlaneCode(st) {
 231				err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
 232			}
 233			return nil, err
 234		}
 235		return nil, toRPCErr(err)
 236	}
 237
 238	if rpcConfig != nil {
 239		if rpcConfig.Context != nil {
 240			ctx = rpcConfig.Context
 241		}
 242		mc = rpcConfig.MethodConfig
 243		onCommit = rpcConfig.OnCommitted
 244		if rpcConfig.Interceptor != nil {
 245			rpcInfo.Context = nil
 246			ns := newStream
 247			newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 248				cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
 249				if err != nil {
 250					return nil, toRPCErr(err)
 251				}
 252				return cs, nil
 253			}
 254		}
 255	}
 256
 257	return newStream(ctx, func() {})
 258}
 259
 260func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
 261	callInfo := defaultCallInfo()
 262	if mc.WaitForReady != nil {
 263		callInfo.failFast = !*mc.WaitForReady
 264	}
 265
 266	// Possible context leak:
 267	// The cancel function for the child context we create will only be called
 268	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
 269	// an error is generated by SendMsg.
 270	// https://github.com/grpc/grpc-go/issues/1818.
 271	var cancel context.CancelFunc
 272	if mc.Timeout != nil && *mc.Timeout >= 0 {
 273		ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
 274	} else {
 275		ctx, cancel = context.WithCancel(ctx)
 276	}
 277	defer func() {
 278		if err != nil {
 279			cancel()
 280		}
 281	}()
 282
 283	for _, o := range opts {
 284		if err := o.before(callInfo); err != nil {
 285			return nil, toRPCErr(err)
 286		}
 287	}
 288	callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
 289	callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
 290	if err := setCallInfoCodec(callInfo); err != nil {
 291		return nil, err
 292	}
 293
 294	callHdr := &transport.CallHdr{
 295		Host:           cc.authority,
 296		Method:         method,
 297		ContentSubtype: callInfo.contentSubtype,
 298		DoneFunc:       doneFunc,
 299	}
 300
 301	// Set our outgoing compression according to the UseCompressor CallOption, if
 302	// set.  In that case, also find the compressor from the encoding package.
 303	// Otherwise, use the compressor configured by the WithCompressor DialOption,
 304	// if set.
 305	var compressorV0 Compressor
 306	var compressorV1 encoding.Compressor
 307	if ct := callInfo.compressorName; ct != "" {
 308		callHdr.SendCompress = ct
 309		if ct != encoding.Identity {
 310			compressorV1 = encoding.GetCompressor(ct)
 311			if compressorV1 == nil {
 312				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
 313			}
 314		}
 315	} else if cc.dopts.compressorV0 != nil {
 316		callHdr.SendCompress = cc.dopts.compressorV0.Type()
 317		compressorV0 = cc.dopts.compressorV0
 318	}
 319	if callInfo.creds != nil {
 320		callHdr.Creds = callInfo.creds
 321	}
 322
 323	cs := &clientStream{
 324		callHdr:      callHdr,
 325		ctx:          ctx,
 326		methodConfig: &mc,
 327		opts:         opts,
 328		callInfo:     callInfo,
 329		cc:           cc,
 330		desc:         desc,
 331		codec:        callInfo.codec,
 332		compressorV0: compressorV0,
 333		compressorV1: compressorV1,
 334		cancel:       cancel,
 335		firstAttempt: true,
 336		onCommit:     onCommit,
 337	}
 338	if !cc.dopts.disableRetry {
 339		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
 340	}
 341	if ml := binarylog.GetMethodLogger(method); ml != nil {
 342		cs.binlogs = append(cs.binlogs, ml)
 343	}
 344	if cc.dopts.binaryLogger != nil {
 345		if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
 346			cs.binlogs = append(cs.binlogs, ml)
 347		}
 348	}
 349
 350	// Pick the transport to use and create a new stream on the transport.
 351	// Assign cs.attempt upon success.
 352	op := func(a *csAttempt) error {
 353		if err := a.getTransport(); err != nil {
 354			return err
 355		}
 356		if err := a.newStream(); err != nil {
 357			return err
 358		}
 359		// Because this operation is always called either here (while creating
 360		// the clientStream) or by the retry code while locked when replaying
 361		// the operation, it is safe to access cs.attempt directly.
 362		cs.attempt = a
 363		return nil
 364	}
 365	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
 366		return nil, err
 367	}
 368
 369	if len(cs.binlogs) != 0 {
 370		md, _ := metadata.FromOutgoingContext(ctx)
 371		logEntry := &binarylog.ClientHeader{
 372			OnClientSide: true,
 373			Header:       md,
 374			MethodName:   method,
 375			Authority:    cs.cc.authority,
 376		}
 377		if deadline, ok := ctx.Deadline(); ok {
 378			logEntry.Timeout = time.Until(deadline)
 379			if logEntry.Timeout < 0 {
 380				logEntry.Timeout = 0
 381			}
 382		}
 383		for _, binlog := range cs.binlogs {
 384			binlog.Log(cs.ctx, logEntry)
 385		}
 386	}
 387
 388	if desc != unaryStreamDesc {
 389		// Listen on cc and stream contexts to cleanup when the user closes the
 390		// ClientConn or cancels the stream context.  In all other cases, an error
 391		// should already be injected into the recv buffer by the transport, which
 392		// the client will eventually receive, and then we will cancel the stream's
 393		// context in clientStream.finish.
 394		go func() {
 395			select {
 396			case <-cc.ctx.Done():
 397				cs.finish(ErrClientConnClosing)
 398			case <-ctx.Done():
 399				cs.finish(toRPCErr(ctx.Err()))
 400			}
 401		}()
 402	}
 403	return cs, nil
 404}
 405
 406// newAttemptLocked creates a new csAttempt without a transport or stream.
 407func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
 408	if err := cs.ctx.Err(); err != nil {
 409		return nil, toRPCErr(err)
 410	}
 411	if err := cs.cc.ctx.Err(); err != nil {
 412		return nil, ErrClientConnClosing
 413	}
 414
 415	ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
 416	method := cs.callHdr.Method
 417	var beginTime time.Time
 418	shs := cs.cc.dopts.copts.StatsHandlers
 419	for _, sh := range shs {
 420		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
 421		beginTime = time.Now()
 422		begin := &stats.Begin{
 423			Client:                    true,
 424			BeginTime:                 beginTime,
 425			FailFast:                  cs.callInfo.failFast,
 426			IsClientStream:            cs.desc.ClientStreams,
 427			IsServerStream:            cs.desc.ServerStreams,
 428			IsTransparentRetryAttempt: isTransparent,
 429		}
 430		sh.HandleRPC(ctx, begin)
 431	}
 432
 433	var trInfo *traceInfo
 434	if EnableTracing {
 435		trInfo = &traceInfo{
 436			tr: newTrace("grpc.Sent."+methodFamily(method), method),
 437			firstLine: firstLine{
 438				client: true,
 439			},
 440		}
 441		if deadline, ok := ctx.Deadline(); ok {
 442			trInfo.firstLine.deadline = time.Until(deadline)
 443		}
 444		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 445		ctx = newTraceContext(ctx, trInfo.tr)
 446	}
 447
 448	if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
 449		// Add extra metadata (metadata that will be added by transport) to context
 450		// so the balancer can see them.
 451		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
 452			"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
 453		))
 454	}
 455
 456	return &csAttempt{
 457		ctx:            ctx,
 458		beginTime:      beginTime,
 459		cs:             cs,
 460		decompressorV0: cs.cc.dopts.dc,
 461		statsHandlers:  shs,
 462		trInfo:         trInfo,
 463	}, nil
 464}
 465
 466func (a *csAttempt) getTransport() error {
 467	cs := a.cs
 468
 469	var err error
 470	a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
 471	if err != nil {
 472		if de, ok := err.(dropError); ok {
 473			err = de.error
 474			a.drop = true
 475		}
 476		return err
 477	}
 478	if a.trInfo != nil {
 479		a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
 480	}
 481	return nil
 482}
 483
 484func (a *csAttempt) newStream() error {
 485	cs := a.cs
 486	cs.callHdr.PreviousAttempts = cs.numRetries
 487
 488	// Merge metadata stored in PickResult, if any, with existing call metadata.
 489	// It is safe to overwrite the csAttempt's context here, since all state
 490	// maintained in it are local to the attempt. When the attempt has to be
 491	// retried, a new instance of csAttempt will be created.
 492	if a.pickResult.Metadata != nil {
 493		// We currently do not have a function it the metadata package which
 494		// merges given metadata with existing metadata in a context. Existing
 495		// function `AppendToOutgoingContext()` takes a variadic argument of key
 496		// value pairs.
 497		//
 498		// TODO: Make it possible to retrieve key value pairs from metadata.MD
 499		// in a form passable to AppendToOutgoingContext(), or create a version
 500		// of AppendToOutgoingContext() that accepts a metadata.MD.
 501		md, _ := metadata.FromOutgoingContext(a.ctx)
 502		md = metadata.Join(md, a.pickResult.Metadata)
 503		a.ctx = metadata.NewOutgoingContext(a.ctx, md)
 504	}
 505
 506	s, err := a.transport.NewStream(a.ctx, cs.callHdr)
 507	if err != nil {
 508		nse, ok := err.(*transport.NewStreamError)
 509		if !ok {
 510			// Unexpected.
 511			return err
 512		}
 513
 514		if nse.AllowTransparentRetry {
 515			a.allowTransparentRetry = true
 516		}
 517
 518		// Unwrap and convert error.
 519		return toRPCErr(nse.Err)
 520	}
 521	a.transportStream = s
 522	a.ctx = s.Context()
 523	a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
 524	return nil
 525}
 526
 527// clientStream implements a client side Stream.
 528type clientStream struct {
 529	callHdr  *transport.CallHdr
 530	opts     []CallOption
 531	callInfo *callInfo
 532	cc       *ClientConn
 533	desc     *StreamDesc
 534
 535	codec        baseCodec
 536	compressorV0 Compressor
 537	compressorV1 encoding.Compressor
 538
 539	cancel context.CancelFunc // cancels all attempts
 540
 541	sentLast bool // sent an end stream
 542
 543	methodConfig *MethodConfig
 544
 545	ctx context.Context // the application's context, wrapped by stats/tracing
 546
 547	retryThrottler *retryThrottler // The throttler active when the RPC began.
 548
 549	binlogs []binarylog.MethodLogger
 550	// serverHeaderBinlogged is a boolean for whether server header has been
 551	// logged. Server header will be logged when the first time one of those
 552	// happens: stream.Header(), stream.Recv().
 553	//
 554	// It's only read and used by Recv() and Header(), so it doesn't need to be
 555	// synchronized.
 556	serverHeaderBinlogged bool
 557
 558	mu                      sync.Mutex
 559	firstAttempt            bool // if true, transparent retry is valid
 560	numRetries              int  // exclusive of transparent retry attempt(s)
 561	numRetriesSincePushback int  // retries since pushback; to reset backoff
 562	finished                bool // TODO: replace with atomic cmpxchg or sync.Once?
 563	// attempt is the active client stream attempt.
 564	// The only place where it is written is the newAttemptLocked method and this method never writes nil.
 565	// So, attempt can be nil only inside newClientStream function when clientStream is first created.
 566	// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
 567	// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
 568	// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
 569	// place where we need to check if the attempt is nil.
 570	attempt *csAttempt
 571	// TODO(hedging): hedging will have multiple attempts simultaneously.
 572	committed        bool // active attempt committed for retry?
 573	onCommit         func()
 574	replayBuffer     []replayOp // operations to replay on retry
 575	replayBufferSize int        // current size of replayBuffer
 576}
 577
 578type replayOp struct {
 579	op      func(a *csAttempt) error
 580	cleanup func()
 581}
 582
 583// csAttempt implements a single transport stream attempt within a
 584// clientStream.
 585type csAttempt struct {
 586	ctx             context.Context
 587	cs              *clientStream
 588	transport       transport.ClientTransport
 589	transportStream *transport.ClientStream
 590	parser          *parser
 591	pickResult      balancer.PickResult
 592
 593	finished        bool
 594	decompressorV0  Decompressor
 595	decompressorV1  encoding.Compressor
 596	decompressorSet bool
 597
 598	mu sync.Mutex // guards trInfo.tr
 599	// trInfo may be nil (if EnableTracing is false).
 600	// trInfo.tr is set when created (if EnableTracing is true),
 601	// and cleared when the finish method is called.
 602	trInfo *traceInfo
 603
 604	statsHandlers []stats.Handler
 605	beginTime     time.Time
 606
 607	// set for newStream errors that may be transparently retried
 608	allowTransparentRetry bool
 609	// set for pick errors that are returned as a status
 610	drop bool
 611}
 612
 613func (cs *clientStream) commitAttemptLocked() {
 614	if !cs.committed && cs.onCommit != nil {
 615		cs.onCommit()
 616	}
 617	cs.committed = true
 618	for _, op := range cs.replayBuffer {
 619		if op.cleanup != nil {
 620			op.cleanup()
 621		}
 622	}
 623	cs.replayBuffer = nil
 624}
 625
 626func (cs *clientStream) commitAttempt() {
 627	cs.mu.Lock()
 628	cs.commitAttemptLocked()
 629	cs.mu.Unlock()
 630}
 631
 632// shouldRetry returns nil if the RPC should be retried; otherwise it returns
 633// the error that should be returned by the operation.  If the RPC should be
 634// retried, the bool indicates whether it is being retried transparently.
 635func (a *csAttempt) shouldRetry(err error) (bool, error) {
 636	cs := a.cs
 637
 638	if cs.finished || cs.committed || a.drop {
 639		// RPC is finished or committed or was dropped by the picker; cannot retry.
 640		return false, err
 641	}
 642	if a.transportStream == nil && a.allowTransparentRetry {
 643		return true, nil
 644	}
 645	// Wait for the trailers.
 646	unprocessed := false
 647	if a.transportStream != nil {
 648		<-a.transportStream.Done()
 649		unprocessed = a.transportStream.Unprocessed()
 650	}
 651	if cs.firstAttempt && unprocessed {
 652		// First attempt, stream unprocessed: transparently retry.
 653		return true, nil
 654	}
 655	if cs.cc.dopts.disableRetry {
 656		return false, err
 657	}
 658
 659	pushback := 0
 660	hasPushback := false
 661	if a.transportStream != nil {
 662		if !a.transportStream.TrailersOnly() {
 663			return false, err
 664		}
 665
 666		// TODO(retry): Move down if the spec changes to not check server pushback
 667		// before considering this a failure for throttling.
 668		sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
 669		if len(sps) == 1 {
 670			var e error
 671			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
 672				channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
 673				cs.retryThrottler.throttle() // This counts as a failure for throttling.
 674				return false, err
 675			}
 676			hasPushback = true
 677		} else if len(sps) > 1 {
 678			channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
 679			cs.retryThrottler.throttle() // This counts as a failure for throttling.
 680			return false, err
 681		}
 682	}
 683
 684	var code codes.Code
 685	if a.transportStream != nil {
 686		code = a.transportStream.Status().Code()
 687	} else {
 688		code = status.Code(err)
 689	}
 690
 691	rp := cs.methodConfig.RetryPolicy
 692	if rp == nil || !rp.RetryableStatusCodes[code] {
 693		return false, err
 694	}
 695
 696	// Note: the ordering here is important; we count this as a failure
 697	// only if the code matched a retryable code.
 698	if cs.retryThrottler.throttle() {
 699		return false, err
 700	}
 701	if cs.numRetries+1 >= rp.MaxAttempts {
 702		return false, err
 703	}
 704
 705	var dur time.Duration
 706	if hasPushback {
 707		dur = time.Millisecond * time.Duration(pushback)
 708		cs.numRetriesSincePushback = 0
 709	} else {
 710		fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
 711		cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
 712		// Apply jitter by multiplying with a random factor between 0.8 and 1.2
 713		cur *= 0.8 + 0.4*rand.Float64()
 714		dur = time.Duration(int64(cur))
 715		cs.numRetriesSincePushback++
 716	}
 717
 718	// TODO(dfawley): we could eagerly fail here if dur puts us past the
 719	// deadline, but unsure if it is worth doing.
 720	t := time.NewTimer(dur)
 721	select {
 722	case <-t.C:
 723		cs.numRetries++
 724		return false, nil
 725	case <-cs.ctx.Done():
 726		t.Stop()
 727		return false, status.FromContextError(cs.ctx.Err()).Err()
 728	}
 729}
 730
 731// Returns nil if a retry was performed and succeeded; error otherwise.
 732func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
 733	for {
 734		attempt.finish(toRPCErr(lastErr))
 735		isTransparent, err := attempt.shouldRetry(lastErr)
 736		if err != nil {
 737			cs.commitAttemptLocked()
 738			return err
 739		}
 740		cs.firstAttempt = false
 741		attempt, err = cs.newAttemptLocked(isTransparent)
 742		if err != nil {
 743			// Only returns error if the clientconn is closed or the context of
 744			// the stream is canceled.
 745			return err
 746		}
 747		// Note that the first op in replayBuffer always sets cs.attempt
 748		// if it is able to pick a transport and create a stream.
 749		if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
 750			return nil
 751		}
 752	}
 753}
 754
 755func (cs *clientStream) Context() context.Context {
 756	cs.commitAttempt()
 757	// No need to lock before using attempt, since we know it is committed and
 758	// cannot change.
 759	if cs.attempt.transportStream != nil {
 760		return cs.attempt.transportStream.Context()
 761	}
 762	return cs.ctx
 763}
 764
 765func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
 766	cs.mu.Lock()
 767	for {
 768		if cs.committed {
 769			cs.mu.Unlock()
 770			// toRPCErr is used in case the error from the attempt comes from
 771			// NewClientStream, which intentionally doesn't return a status
 772			// error to allow for further inspection; all other errors should
 773			// already be status errors.
 774			return toRPCErr(op(cs.attempt))
 775		}
 776		if len(cs.replayBuffer) == 0 {
 777			// For the first op, which controls creation of the stream and
 778			// assigns cs.attempt, we need to create a new attempt inline
 779			// before executing the first op.  On subsequent ops, the attempt
 780			// is created immediately before replaying the ops.
 781			var err error
 782			if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
 783				cs.mu.Unlock()
 784				cs.finish(err)
 785				return err
 786			}
 787		}
 788		a := cs.attempt
 789		cs.mu.Unlock()
 790		err := op(a)
 791		cs.mu.Lock()
 792		if a != cs.attempt {
 793			// We started another attempt already.
 794			continue
 795		}
 796		if err == io.EOF {
 797			<-a.transportStream.Done()
 798		}
 799		if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
 800			onSuccess()
 801			cs.mu.Unlock()
 802			return err
 803		}
 804		if err := cs.retryLocked(a, err); err != nil {
 805			cs.mu.Unlock()
 806			return err
 807		}
 808	}
 809}
 810
 811func (cs *clientStream) Header() (metadata.MD, error) {
 812	var m metadata.MD
 813	err := cs.withRetry(func(a *csAttempt) error {
 814		var err error
 815		m, err = a.transportStream.Header()
 816		return toRPCErr(err)
 817	}, cs.commitAttemptLocked)
 818
 819	if m == nil && err == nil {
 820		// The stream ended with success.  Finish the clientStream.
 821		err = io.EOF
 822	}
 823
 824	if err != nil {
 825		cs.finish(err)
 826		// Do not return the error.  The user should get it by calling Recv().
 827		return nil, nil
 828	}
 829
 830	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
 831		// Only log if binary log is on and header has not been logged, and
 832		// there is actually headers to log.
 833		logEntry := &binarylog.ServerHeader{
 834			OnClientSide: true,
 835			Header:       m,
 836			PeerAddr:     nil,
 837		}
 838		if peer, ok := peer.FromContext(cs.Context()); ok {
 839			logEntry.PeerAddr = peer.Addr
 840		}
 841		cs.serverHeaderBinlogged = true
 842		for _, binlog := range cs.binlogs {
 843			binlog.Log(cs.ctx, logEntry)
 844		}
 845	}
 846
 847	return m, nil
 848}
 849
 850func (cs *clientStream) Trailer() metadata.MD {
 851	// On RPC failure, we never need to retry, because usage requires that
 852	// RecvMsg() returned a non-nil error before calling this function is valid.
 853	// We would have retried earlier if necessary.
 854	//
 855	// Commit the attempt anyway, just in case users are not following those
 856	// directions -- it will prevent races and should not meaningfully impact
 857	// performance.
 858	cs.commitAttempt()
 859	if cs.attempt.transportStream == nil {
 860		return nil
 861	}
 862	return cs.attempt.transportStream.Trailer()
 863}
 864
 865func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
 866	for _, f := range cs.replayBuffer {
 867		if err := f.op(attempt); err != nil {
 868			return err
 869		}
 870	}
 871	return nil
 872}
 873
 874func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
 875	// Note: we still will buffer if retry is disabled (for transparent retries).
 876	if cs.committed {
 877		return
 878	}
 879	cs.replayBufferSize += sz
 880	if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
 881		cs.commitAttemptLocked()
 882		cleanup()
 883		return
 884	}
 885	cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
 886}
 887
 888func (cs *clientStream) SendMsg(m any) (err error) {
 889	defer func() {
 890		if err != nil && err != io.EOF {
 891			// Call finish on the client stream for errors generated by this SendMsg
 892			// call, as these indicate problems created by this client.  (Transport
 893			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
 894			// error will be returned from RecvMsg eventually in that case, or be
 895			// retried.)
 896			cs.finish(err)
 897		}
 898	}()
 899	if cs.sentLast {
 900		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
 901	}
 902	if !cs.desc.ClientStreams {
 903		cs.sentLast = true
 904	}
 905
 906	// load hdr, payload, data
 907	hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
 908	if err != nil {
 909		return err
 910	}
 911
 912	defer func() {
 913		data.Free()
 914		// only free payload if compression was made, and therefore it is a different set
 915		// of buffers from data.
 916		if pf.isCompressed() {
 917			payload.Free()
 918		}
 919	}()
 920
 921	dataLen := data.Len()
 922	payloadLen := payload.Len()
 923	// TODO(dfawley): should we be checking len(data) instead?
 924	if payloadLen > *cs.callInfo.maxSendMessageSize {
 925		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
 926	}
 927
 928	// always take an extra ref in case data == payload (i.e. when the data isn't
 929	// compressed). The original ref will always be freed by the deferred free above.
 930	payload.Ref()
 931	op := func(a *csAttempt) error {
 932		return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
 933	}
 934
 935	// onSuccess is invoked when the op is captured for a subsequent retry. If the
 936	// stream was established by a previous message and therefore retries are
 937	// disabled, onSuccess will not be invoked, and payloadRef can be freed
 938	// immediately.
 939	onSuccessCalled := false
 940	err = cs.withRetry(op, func() {
 941		cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
 942		onSuccessCalled = true
 943	})
 944	if !onSuccessCalled {
 945		payload.Free()
 946	}
 947	if len(cs.binlogs) != 0 && err == nil {
 948		cm := &binarylog.ClientMessage{
 949			OnClientSide: true,
 950			Message:      data.Materialize(),
 951		}
 952		for _, binlog := range cs.binlogs {
 953			binlog.Log(cs.ctx, cm)
 954		}
 955	}
 956	return err
 957}
 958
 959func (cs *clientStream) RecvMsg(m any) error {
 960	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
 961		// Call Header() to binary log header if it's not already logged.
 962		cs.Header()
 963	}
 964	var recvInfo *payloadInfo
 965	if len(cs.binlogs) != 0 {
 966		recvInfo = &payloadInfo{}
 967		defer recvInfo.free()
 968	}
 969	err := cs.withRetry(func(a *csAttempt) error {
 970		return a.recvMsg(m, recvInfo)
 971	}, cs.commitAttemptLocked)
 972	if len(cs.binlogs) != 0 && err == nil {
 973		sm := &binarylog.ServerMessage{
 974			OnClientSide: true,
 975			Message:      recvInfo.uncompressedBytes.Materialize(),
 976		}
 977		for _, binlog := range cs.binlogs {
 978			binlog.Log(cs.ctx, sm)
 979		}
 980	}
 981	if err != nil || !cs.desc.ServerStreams {
 982		// err != nil or non-server-streaming indicates end of stream.
 983		cs.finish(err)
 984	}
 985	return err
 986}
 987
 988func (cs *clientStream) CloseSend() error {
 989	if cs.sentLast {
 990		// TODO: return an error and finish the stream instead, due to API misuse?
 991		return nil
 992	}
 993	cs.sentLast = true
 994	op := func(a *csAttempt) error {
 995		a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
 996		// Always return nil; io.EOF is the only error that might make sense
 997		// instead, but there is no need to signal the client to call RecvMsg
 998		// as the only use left for the stream after CloseSend is to call
 999		// RecvMsg.  This also matches historical behavior.
1000		return nil
1001	}
1002	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1003	if len(cs.binlogs) != 0 {
1004		chc := &binarylog.ClientHalfClose{
1005			OnClientSide: true,
1006		}
1007		for _, binlog := range cs.binlogs {
1008			binlog.Log(cs.ctx, chc)
1009		}
1010	}
1011	// We never returned an error here for reasons.
1012	return nil
1013}
1014
1015func (cs *clientStream) finish(err error) {
1016	if err == io.EOF {
1017		// Ending a stream with EOF indicates a success.
1018		err = nil
1019	}
1020	cs.mu.Lock()
1021	if cs.finished {
1022		cs.mu.Unlock()
1023		return
1024	}
1025	cs.finished = true
1026	for _, onFinish := range cs.callInfo.onFinish {
1027		onFinish(err)
1028	}
1029	cs.commitAttemptLocked()
1030	if cs.attempt != nil {
1031		cs.attempt.finish(err)
1032		// after functions all rely upon having a stream.
1033		if cs.attempt.transportStream != nil {
1034			for _, o := range cs.opts {
1035				o.after(cs.callInfo, cs.attempt)
1036			}
1037		}
1038	}
1039
1040	cs.mu.Unlock()
1041	// Only one of cancel or trailer needs to be logged.
1042	if len(cs.binlogs) != 0 {
1043		switch err {
1044		case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1045			c := &binarylog.Cancel{
1046				OnClientSide: true,
1047			}
1048			for _, binlog := range cs.binlogs {
1049				binlog.Log(cs.ctx, c)
1050			}
1051		default:
1052			logEntry := &binarylog.ServerTrailer{
1053				OnClientSide: true,
1054				Trailer:      cs.Trailer(),
1055				Err:          err,
1056			}
1057			if peer, ok := peer.FromContext(cs.Context()); ok {
1058				logEntry.PeerAddr = peer.Addr
1059			}
1060			for _, binlog := range cs.binlogs {
1061				binlog.Log(cs.ctx, logEntry)
1062			}
1063		}
1064	}
1065	if err == nil {
1066		cs.retryThrottler.successfulRPC()
1067	}
1068	if channelz.IsOn() {
1069		if err != nil {
1070			cs.cc.incrCallsFailed()
1071		} else {
1072			cs.cc.incrCallsSucceeded()
1073		}
1074	}
1075	cs.cancel()
1076}
1077
1078func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
1079	cs := a.cs
1080	if a.trInfo != nil {
1081		a.mu.Lock()
1082		if a.trInfo.tr != nil {
1083			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1084		}
1085		a.mu.Unlock()
1086	}
1087	if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
1088		if !cs.desc.ClientStreams {
1089			// For non-client-streaming RPCs, we return nil instead of EOF on error
1090			// because the generated code requires it.  finish is not called; RecvMsg()
1091			// will call it with the stream's status independently.
1092			return nil
1093		}
1094		return io.EOF
1095	}
1096	if len(a.statsHandlers) != 0 {
1097		for _, sh := range a.statsHandlers {
1098			sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
1099		}
1100	}
1101	return nil
1102}
1103
1104func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
1105	cs := a.cs
1106	if len(a.statsHandlers) != 0 && payInfo == nil {
1107		payInfo = &payloadInfo{}
1108		defer payInfo.free()
1109	}
1110
1111	if !a.decompressorSet {
1112		// Block until we receive headers containing received message encoding.
1113		if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1114			if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
1115				// No configured decompressor, or it does not match the incoming
1116				// message encoding; attempt to find a registered compressor that does.
1117				a.decompressorV0 = nil
1118				a.decompressorV1 = encoding.GetCompressor(ct)
1119			}
1120		} else {
1121			// No compression is used; disable our decompressor.
1122			a.decompressorV0 = nil
1123		}
1124		// Only initialize this state once per stream.
1125		a.decompressorSet = true
1126	}
1127	if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
1128		if err == io.EOF {
1129			if statusErr := a.transportStream.Status().Err(); statusErr != nil {
1130				return statusErr
1131			}
1132			return io.EOF // indicates successful end of stream.
1133		}
1134
1135		return toRPCErr(err)
1136	}
1137	if a.trInfo != nil {
1138		a.mu.Lock()
1139		if a.trInfo.tr != nil {
1140			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1141		}
1142		a.mu.Unlock()
1143	}
1144	for _, sh := range a.statsHandlers {
1145		sh.HandleRPC(a.ctx, &stats.InPayload{
1146			Client:           true,
1147			RecvTime:         time.Now(),
1148			Payload:          m,
1149			WireLength:       payInfo.compressedLength + headerLen,
1150			CompressedLength: payInfo.compressedLength,
1151			Length:           payInfo.uncompressedBytes.Len(),
1152		})
1153	}
1154	if cs.desc.ServerStreams {
1155		// Subsequent messages should be received by subsequent RecvMsg calls.
1156		return nil
1157	}
1158	// Special handling for non-server-stream rpcs.
1159	// This recv expects EOF or errors, so we don't collect inPayload.
1160	if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
1161		return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1162	} else if err != nil {
1163		return toRPCErr(err)
1164	}
1165	return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1166}
1167
1168func (a *csAttempt) finish(err error) {
1169	a.mu.Lock()
1170	if a.finished {
1171		a.mu.Unlock()
1172		return
1173	}
1174	a.finished = true
1175	if err == io.EOF {
1176		// Ending a stream with EOF indicates a success.
1177		err = nil
1178	}
1179	var tr metadata.MD
1180	if a.transportStream != nil {
1181		a.transportStream.Close(err)
1182		tr = a.transportStream.Trailer()
1183	}
1184
1185	if a.pickResult.Done != nil {
1186		br := false
1187		if a.transportStream != nil {
1188			br = a.transportStream.BytesReceived()
1189		}
1190		a.pickResult.Done(balancer.DoneInfo{
1191			Err:           err,
1192			Trailer:       tr,
1193			BytesSent:     a.transportStream != nil,
1194			BytesReceived: br,
1195			ServerLoad:    balancerload.Parse(tr),
1196		})
1197	}
1198	for _, sh := range a.statsHandlers {
1199		end := &stats.End{
1200			Client:    true,
1201			BeginTime: a.beginTime,
1202			EndTime:   time.Now(),
1203			Trailer:   tr,
1204			Error:     err,
1205		}
1206		sh.HandleRPC(a.ctx, end)
1207	}
1208	if a.trInfo != nil && a.trInfo.tr != nil {
1209		if err == nil {
1210			a.trInfo.tr.LazyPrintf("RPC: [OK]")
1211		} else {
1212			a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1213			a.trInfo.tr.SetError()
1214		}
1215		a.trInfo.tr.Finish()
1216		a.trInfo.tr = nil
1217	}
1218	a.mu.Unlock()
1219}
1220
1221// newNonRetryClientStream creates a ClientStream with the specified transport, on the
1222// given addrConn.
1223//
1224// It's expected that the given transport is either the same one in addrConn, or
1225// is already closed. To avoid race, transport is specified separately, instead
1226// of using ac.transport.
1227//
1228// Main difference between this and ClientConn.NewStream:
1229// - no retry
1230// - no service config (or wait for service config)
1231// - no tracing or stats
1232func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1233	if t == nil {
1234		// TODO: return RPC error here?
1235		return nil, errors.New("transport provided is nil")
1236	}
1237	// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1238	c := &callInfo{}
1239
1240	// Possible context leak:
1241	// The cancel function for the child context we create will only be called
1242	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1243	// an error is generated by SendMsg.
1244	// https://github.com/grpc/grpc-go/issues/1818.
1245	ctx, cancel := context.WithCancel(ctx)
1246	defer func() {
1247		if err != nil {
1248			cancel()
1249		}
1250	}()
1251
1252	for _, o := range opts {
1253		if err := o.before(c); err != nil {
1254			return nil, toRPCErr(err)
1255		}
1256	}
1257	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1258	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1259	if err := setCallInfoCodec(c); err != nil {
1260		return nil, err
1261	}
1262
1263	callHdr := &transport.CallHdr{
1264		Host:           ac.cc.authority,
1265		Method:         method,
1266		ContentSubtype: c.contentSubtype,
1267	}
1268
1269	// Set our outgoing compression according to the UseCompressor CallOption, if
1270	// set.  In that case, also find the compressor from the encoding package.
1271	// Otherwise, use the compressor configured by the WithCompressor DialOption,
1272	// if set.
1273	var cp Compressor
1274	var comp encoding.Compressor
1275	if ct := c.compressorName; ct != "" {
1276		callHdr.SendCompress = ct
1277		if ct != encoding.Identity {
1278			comp = encoding.GetCompressor(ct)
1279			if comp == nil {
1280				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1281			}
1282		}
1283	} else if ac.cc.dopts.compressorV0 != nil {
1284		callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1285		cp = ac.cc.dopts.compressorV0
1286	}
1287	if c.creds != nil {
1288		callHdr.Creds = c.creds
1289	}
1290
1291	// Use a special addrConnStream to avoid retry.
1292	as := &addrConnStream{
1293		callHdr:          callHdr,
1294		ac:               ac,
1295		ctx:              ctx,
1296		cancel:           cancel,
1297		opts:             opts,
1298		callInfo:         c,
1299		desc:             desc,
1300		codec:            c.codec,
1301		sendCompressorV0: cp,
1302		sendCompressorV1: comp,
1303		transport:        t,
1304	}
1305
1306	s, err := as.transport.NewStream(as.ctx, as.callHdr)
1307	if err != nil {
1308		err = toRPCErr(err)
1309		return nil, err
1310	}
1311	as.transportStream = s
1312	as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
1313	ac.incrCallsStarted()
1314	if desc != unaryStreamDesc {
1315		// Listen on stream context to cleanup when the stream context is
1316		// canceled.  Also listen for the addrConn's context in case the
1317		// addrConn is closed or reconnects to a different address.  In all
1318		// other cases, an error should already be injected into the recv
1319		// buffer by the transport, which the client will eventually receive,
1320		// and then we will cancel the stream's context in
1321		// addrConnStream.finish.
1322		go func() {
1323			ac.mu.Lock()
1324			acCtx := ac.ctx
1325			ac.mu.Unlock()
1326			select {
1327			case <-acCtx.Done():
1328				as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1329			case <-ctx.Done():
1330				as.finish(toRPCErr(ctx.Err()))
1331			}
1332		}()
1333	}
1334	return as, nil
1335}
1336
1337type addrConnStream struct {
1338	transportStream  *transport.ClientStream
1339	ac               *addrConn
1340	callHdr          *transport.CallHdr
1341	cancel           context.CancelFunc
1342	opts             []CallOption
1343	callInfo         *callInfo
1344	transport        transport.ClientTransport
1345	ctx              context.Context
1346	sentLast         bool
1347	desc             *StreamDesc
1348	codec            baseCodec
1349	sendCompressorV0 Compressor
1350	sendCompressorV1 encoding.Compressor
1351	decompressorSet  bool
1352	decompressorV0   Decompressor
1353	decompressorV1   encoding.Compressor
1354	parser           *parser
1355
1356	// mu guards finished and is held for the entire finish method.
1357	mu       sync.Mutex
1358	finished bool
1359}
1360
1361func (as *addrConnStream) Header() (metadata.MD, error) {
1362	m, err := as.transportStream.Header()
1363	if err != nil {
1364		as.finish(toRPCErr(err))
1365	}
1366	return m, err
1367}
1368
1369func (as *addrConnStream) Trailer() metadata.MD {
1370	return as.transportStream.Trailer()
1371}
1372
1373func (as *addrConnStream) CloseSend() error {
1374	if as.sentLast {
1375		// TODO: return an error and finish the stream instead, due to API misuse?
1376		return nil
1377	}
1378	as.sentLast = true
1379
1380	as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1381	// Always return nil; io.EOF is the only error that might make sense
1382	// instead, but there is no need to signal the client to call RecvMsg
1383	// as the only use left for the stream after CloseSend is to call
1384	// RecvMsg.  This also matches historical behavior.
1385	return nil
1386}
1387
1388func (as *addrConnStream) Context() context.Context {
1389	return as.transportStream.Context()
1390}
1391
1392func (as *addrConnStream) SendMsg(m any) (err error) {
1393	defer func() {
1394		if err != nil && err != io.EOF {
1395			// Call finish on the client stream for errors generated by this SendMsg
1396			// call, as these indicate problems created by this client.  (Transport
1397			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1398			// error will be returned from RecvMsg eventually in that case, or be
1399			// retried.)
1400			as.finish(err)
1401		}
1402	}()
1403	if as.sentLast {
1404		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1405	}
1406	if !as.desc.ClientStreams {
1407		as.sentLast = true
1408	}
1409
1410	// load hdr, payload, data
1411	hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
1412	if err != nil {
1413		return err
1414	}
1415
1416	defer func() {
1417		data.Free()
1418		// only free payload if compression was made, and therefore it is a different set
1419		// of buffers from data.
1420		if pf.isCompressed() {
1421			payload.Free()
1422		}
1423	}()
1424
1425	// TODO(dfawley): should we be checking len(data) instead?
1426	if payload.Len() > *as.callInfo.maxSendMessageSize {
1427		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
1428	}
1429
1430	if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
1431		if !as.desc.ClientStreams {
1432			// For non-client-streaming RPCs, we return nil instead of EOF on error
1433			// because the generated code requires it.  finish is not called; RecvMsg()
1434			// will call it with the stream's status independently.
1435			return nil
1436		}
1437		return io.EOF
1438	}
1439
1440	return nil
1441}
1442
1443func (as *addrConnStream) RecvMsg(m any) (err error) {
1444	defer func() {
1445		if err != nil || !as.desc.ServerStreams {
1446			// err != nil or non-server-streaming indicates end of stream.
1447			as.finish(err)
1448		}
1449	}()
1450
1451	if !as.decompressorSet {
1452		// Block until we receive headers containing received message encoding.
1453		if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1454			if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
1455				// No configured decompressor, or it does not match the incoming
1456				// message encoding; attempt to find a registered compressor that does.
1457				as.decompressorV0 = nil
1458				as.decompressorV1 = encoding.GetCompressor(ct)
1459			}
1460		} else {
1461			// No compression is used; disable our decompressor.
1462			as.decompressorV0 = nil
1463		}
1464		// Only initialize this state once per stream.
1465		as.decompressorSet = true
1466	}
1467	if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
1468		if err == io.EOF {
1469			if statusErr := as.transportStream.Status().Err(); statusErr != nil {
1470				return statusErr
1471			}
1472			return io.EOF // indicates successful end of stream.
1473		}
1474		return toRPCErr(err)
1475	}
1476
1477	if as.desc.ServerStreams {
1478		// Subsequent messages should be received by subsequent RecvMsg calls.
1479		return nil
1480	}
1481
1482	// Special handling for non-server-stream rpcs.
1483	// This recv expects EOF or errors, so we don't collect inPayload.
1484	if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1485		return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1486	} else if err != nil {
1487		return toRPCErr(err)
1488	}
1489	return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1490}
1491
1492func (as *addrConnStream) finish(err error) {
1493	as.mu.Lock()
1494	if as.finished {
1495		as.mu.Unlock()
1496		return
1497	}
1498	as.finished = true
1499	if err == io.EOF {
1500		// Ending a stream with EOF indicates a success.
1501		err = nil
1502	}
1503	if as.transportStream != nil {
1504		as.transportStream.Close(err)
1505	}
1506
1507	if err != nil {
1508		as.ac.incrCallsFailed()
1509	} else {
1510		as.ac.incrCallsSucceeded()
1511	}
1512	as.cancel()
1513	as.mu.Unlock()
1514}
1515
1516// ServerStream defines the server-side behavior of a streaming RPC.
1517//
1518// Errors returned from ServerStream methods are compatible with the status
1519// package.  However, the status code will often not match the RPC status as
1520// seen by the client application, and therefore, should not be relied upon for
1521// this purpose.
1522type ServerStream interface {
1523	// SetHeader sets the header metadata. It may be called multiple times.
1524	// When call multiple times, all the provided metadata will be merged.
1525	// All the metadata will be sent out when one of the following happens:
1526	//  - ServerStream.SendHeader() is called;
1527	//  - The first response is sent out;
1528	//  - An RPC status is sent out (error or success).
1529	SetHeader(metadata.MD) error
1530	// SendHeader sends the header metadata.
1531	// The provided md and headers set by SetHeader() will be sent.
1532	// It fails if called multiple times.
1533	SendHeader(metadata.MD) error
1534	// SetTrailer sets the trailer metadata which will be sent with the RPC status.
1535	// When called more than once, all the provided metadata will be merged.
1536	SetTrailer(metadata.MD)
1537	// Context returns the context for this stream.
1538	Context() context.Context
1539	// SendMsg sends a message. On error, SendMsg aborts the stream and the
1540	// error is returned directly.
1541	//
1542	// SendMsg blocks until:
1543	//   - There is sufficient flow control to schedule m with the transport, or
1544	//   - The stream is done, or
1545	//   - The stream breaks.
1546	//
1547	// SendMsg does not wait until the message is received by the client. An
1548	// untimely stream closure may result in lost messages.
1549	//
1550	// It is safe to have a goroutine calling SendMsg and another goroutine
1551	// calling RecvMsg on the same stream at the same time, but it is not safe
1552	// to call SendMsg on the same stream in different goroutines.
1553	//
1554	// It is not safe to modify the message after calling SendMsg. Tracing
1555	// libraries and stats handlers may use the message lazily.
1556	SendMsg(m any) error
1557	// RecvMsg blocks until it receives a message into m or the stream is
1558	// done. It returns io.EOF when the client has performed a CloseSend. On
1559	// any non-EOF error, the stream is aborted and the error contains the
1560	// RPC status.
1561	//
1562	// It is safe to have a goroutine calling SendMsg and another goroutine
1563	// calling RecvMsg on the same stream at the same time, but it is not
1564	// safe to call RecvMsg on the same stream in different goroutines.
1565	RecvMsg(m any) error
1566}
1567
1568// serverStream implements a server side Stream.
1569type serverStream struct {
1570	ctx   context.Context
1571	s     *transport.ServerStream
1572	p     *parser
1573	codec baseCodec
1574
1575	compressorV0   Compressor
1576	compressorV1   encoding.Compressor
1577	decompressorV0 Decompressor
1578	decompressorV1 encoding.Compressor
1579
1580	sendCompressorName string
1581
1582	maxReceiveMessageSize int
1583	maxSendMessageSize    int
1584	trInfo                *traceInfo
1585
1586	statsHandler []stats.Handler
1587
1588	binlogs []binarylog.MethodLogger
1589	// serverHeaderBinlogged indicates whether server header has been logged. It
1590	// will happen when one of the following two happens: stream.SendHeader(),
1591	// stream.Send().
1592	//
1593	// It's only checked in send and sendHeader, doesn't need to be
1594	// synchronized.
1595	serverHeaderBinlogged bool
1596
1597	mu sync.Mutex // protects trInfo.tr after the service handler runs.
1598}
1599
1600func (ss *serverStream) Context() context.Context {
1601	return ss.ctx
1602}
1603
1604func (ss *serverStream) SetHeader(md metadata.MD) error {
1605	if md.Len() == 0 {
1606		return nil
1607	}
1608	err := imetadata.Validate(md)
1609	if err != nil {
1610		return status.Error(codes.Internal, err.Error())
1611	}
1612	return ss.s.SetHeader(md)
1613}
1614
1615func (ss *serverStream) SendHeader(md metadata.MD) error {
1616	err := imetadata.Validate(md)
1617	if err != nil {
1618		return status.Error(codes.Internal, err.Error())
1619	}
1620
1621	err = ss.s.SendHeader(md)
1622	if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
1623		h, _ := ss.s.Header()
1624		sh := &binarylog.ServerHeader{
1625			Header: h,
1626		}
1627		ss.serverHeaderBinlogged = true
1628		for _, binlog := range ss.binlogs {
1629			binlog.Log(ss.ctx, sh)
1630		}
1631	}
1632	return err
1633}
1634
1635func (ss *serverStream) SetTrailer(md metadata.MD) {
1636	if md.Len() == 0 {
1637		return
1638	}
1639	if err := imetadata.Validate(md); err != nil {
1640		logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1641	}
1642	ss.s.SetTrailer(md)
1643}
1644
1645func (ss *serverStream) SendMsg(m any) (err error) {
1646	defer func() {
1647		if ss.trInfo != nil {
1648			ss.mu.Lock()
1649			if ss.trInfo.tr != nil {
1650				if err == nil {
1651					ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1652				} else {
1653					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1654					ss.trInfo.tr.SetError()
1655				}
1656			}
1657			ss.mu.Unlock()
1658		}
1659		if err != nil && err != io.EOF {
1660			st, _ := status.FromError(toRPCErr(err))
1661			ss.s.WriteStatus(st)
1662			// Non-user specified status was sent out. This should be an error
1663			// case (as a server side Cancel maybe).
1664			//
1665			// This is not handled specifically now. User will return a final
1666			// status from the service handler, we will log that error instead.
1667			// This behavior is similar to an interceptor.
1668		}
1669	}()
1670
1671	// Server handler could have set new compressor by calling SetSendCompressor.
1672	// In case it is set, we need to use it for compressing outbound message.
1673	if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1674		ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1675		ss.sendCompressorName = sendCompressorsName
1676	}
1677
1678	// load hdr, payload, data
1679	hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
1680	if err != nil {
1681		return err
1682	}
1683
1684	defer func() {
1685		data.Free()
1686		// only free payload if compression was made, and therefore it is a different set
1687		// of buffers from data.
1688		if pf.isCompressed() {
1689			payload.Free()
1690		}
1691	}()
1692
1693	dataLen := data.Len()
1694	payloadLen := payload.Len()
1695
1696	// TODO(dfawley): should we be checking len(data) instead?
1697	if payloadLen > ss.maxSendMessageSize {
1698		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
1699	}
1700	if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
1701		return toRPCErr(err)
1702	}
1703
1704	if len(ss.binlogs) != 0 {
1705		if !ss.serverHeaderBinlogged {
1706			h, _ := ss.s.Header()
1707			sh := &binarylog.ServerHeader{
1708				Header: h,
1709			}
1710			ss.serverHeaderBinlogged = true
1711			for _, binlog := range ss.binlogs {
1712				binlog.Log(ss.ctx, sh)
1713			}
1714		}
1715		sm := &binarylog.ServerMessage{
1716			Message: data.Materialize(),
1717		}
1718		for _, binlog := range ss.binlogs {
1719			binlog.Log(ss.ctx, sm)
1720		}
1721	}
1722	if len(ss.statsHandler) != 0 {
1723		for _, sh := range ss.statsHandler {
1724			sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
1725		}
1726	}
1727	return nil
1728}
1729
1730func (ss *serverStream) RecvMsg(m any) (err error) {
1731	defer func() {
1732		if ss.trInfo != nil {
1733			ss.mu.Lock()
1734			if ss.trInfo.tr != nil {
1735				if err == nil {
1736					ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1737				} else if err != io.EOF {
1738					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1739					ss.trInfo.tr.SetError()
1740				}
1741			}
1742			ss.mu.Unlock()
1743		}
1744		if err != nil && err != io.EOF {
1745			st, _ := status.FromError(toRPCErr(err))
1746			ss.s.WriteStatus(st)
1747			// Non-user specified status was sent out. This should be an error
1748			// case (as a server side Cancel maybe).
1749			//
1750			// This is not handled specifically now. User will return a final
1751			// status from the service handler, we will log that error instead.
1752			// This behavior is similar to an interceptor.
1753		}
1754	}()
1755	var payInfo *payloadInfo
1756	if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
1757		payInfo = &payloadInfo{}
1758		defer payInfo.free()
1759	}
1760	if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
1761		if err == io.EOF {
1762			if len(ss.binlogs) != 0 {
1763				chc := &binarylog.ClientHalfClose{}
1764				for _, binlog := range ss.binlogs {
1765					binlog.Log(ss.ctx, chc)
1766				}
1767			}
1768			return err
1769		}
1770		if err == io.ErrUnexpectedEOF {
1771			err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
1772		}
1773		return toRPCErr(err)
1774	}
1775	if len(ss.statsHandler) != 0 {
1776		for _, sh := range ss.statsHandler {
1777			sh.HandleRPC(ss.s.Context(), &stats.InPayload{
1778				RecvTime:         time.Now(),
1779				Payload:          m,
1780				Length:           payInfo.uncompressedBytes.Len(),
1781				WireLength:       payInfo.compressedLength + headerLen,
1782				CompressedLength: payInfo.compressedLength,
1783			})
1784		}
1785	}
1786	if len(ss.binlogs) != 0 {
1787		cm := &binarylog.ClientMessage{
1788			Message: payInfo.uncompressedBytes.Materialize(),
1789		}
1790		for _, binlog := range ss.binlogs {
1791			binlog.Log(ss.ctx, cm)
1792		}
1793	}
1794	return nil
1795}
1796
1797// MethodFromServerStream returns the method string for the input stream.
1798// The returned string is in the format of "/service/method".
1799func MethodFromServerStream(stream ServerStream) (string, bool) {
1800	return Method(stream.Context())
1801}
1802
1803// prepareMsg returns the hdr, payload and data using the compressors passed or
1804// using the passed preparedmsg. The returned boolean indicates whether
1805// compression was made and therefore whether the payload needs to be freed in
1806// addition to the returned data. Freeing the payload if the returned boolean is
1807// false can lead to undefined behavior.
1808func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
1809	if preparedMsg, ok := m.(*PreparedMsg); ok {
1810		return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
1811	}
1812	// The input interface is not a prepared msg.
1813	// Marshal and Compress the data at this point
1814	data, err = encode(codec, m)
1815	if err != nil {
1816		return nil, nil, nil, 0, err
1817	}
1818	compData, pf, err := compress(data, cp, comp, pool)
1819	if err != nil {
1820		data.Free()
1821		return nil, nil, nil, 0, err
1822	}
1823	hdr, payload = msgHeader(data, compData, pf)
1824	return hdr, data, payload, pf, nil
1825}