1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 rand "math/rand/v2"
27 "strconv"
28 "sync"
29 "time"
30
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/balancerload"
36 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcutil"
39 imetadata "google.golang.org/grpc/internal/metadata"
40 iresolver "google.golang.org/grpc/internal/resolver"
41 "google.golang.org/grpc/internal/serviceconfig"
42 istatus "google.golang.org/grpc/internal/status"
43 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/mem"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49)
50
51var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
52
53// StreamHandler defines the handler called by gRPC server to complete the
54// execution of a streaming RPC.
55//
56// If a StreamHandler returns an error, it should either be produced by the
57// status package, or be one of the context errors. Otherwise, gRPC will use
58// codes.Unknown as the status code and err.Error() as the status message of the
59// RPC.
60type StreamHandler func(srv any, stream ServerStream) error
61
62// StreamDesc represents a streaming RPC service's method specification. Used
63// on the server when registering services and on the client when initiating
64// new streams.
65type StreamDesc struct {
66 // StreamName and Handler are only used when registering handlers on a
67 // server.
68 StreamName string // the name of the method excluding the service
69 Handler StreamHandler // the handler called for the method
70
71 // ServerStreams and ClientStreams are used for registering handlers on a
72 // server as well as defining RPC behavior when passed to NewClientStream
73 // and ClientConn.NewStream. At least one must be true.
74 ServerStreams bool // indicates the server can perform streaming sends
75 ClientStreams bool // indicates the client can perform streaming sends
76}
77
78// Stream defines the common interface a client or server stream has to satisfy.
79//
80// Deprecated: See ClientStream and ServerStream documentation instead.
81type Stream interface {
82 // Deprecated: See ClientStream and ServerStream documentation instead.
83 Context() context.Context
84 // Deprecated: See ClientStream and ServerStream documentation instead.
85 SendMsg(m any) error
86 // Deprecated: See ClientStream and ServerStream documentation instead.
87 RecvMsg(m any) error
88}
89
90// ClientStream defines the client-side behavior of a streaming RPC.
91//
92// All errors returned from ClientStream methods are compatible with the
93// status package.
94type ClientStream interface {
95 // Header returns the header metadata received from the server if there
96 // is any. It blocks if the metadata is not ready to read. If the metadata
97 // is nil and the error is also nil, then the stream was terminated without
98 // headers, and the status can be discovered by calling RecvMsg.
99 Header() (metadata.MD, error)
100 // Trailer returns the trailer metadata from the server, if there is any.
101 // It must only be called after stream.CloseAndRecv has returned, or
102 // stream.Recv has returned a non-nil error (including io.EOF).
103 Trailer() metadata.MD
104 // CloseSend closes the send direction of the stream. It closes the stream
105 // when non-nil error is met. It is also not safe to call CloseSend
106 // concurrently with SendMsg.
107 CloseSend() error
108 // Context returns the context for this stream.
109 //
110 // It should not be called until after Header or RecvMsg has returned. Once
111 // called, subsequent client-side retries are disabled.
112 Context() context.Context
113 // SendMsg is generally called by generated code. On error, SendMsg aborts
114 // the stream. If the error was generated by the client, the status is
115 // returned directly; otherwise, io.EOF is returned and the status of
116 // the stream may be discovered using RecvMsg. For unary or server-streaming
117 // RPCs (StreamDesc.ClientStreams is false), a nil error is returned
118 // unconditionally.
119 //
120 // SendMsg blocks until:
121 // - There is sufficient flow control to schedule m with the transport, or
122 // - The stream is done, or
123 // - The stream breaks.
124 //
125 // SendMsg does not wait until the message is received by the server. An
126 // untimely stream closure may result in lost messages. To ensure delivery,
127 // users should ensure the RPC completed successfully using RecvMsg.
128 //
129 // It is safe to have a goroutine calling SendMsg and another goroutine
130 // calling RecvMsg on the same stream at the same time, but it is not safe
131 // to call SendMsg on the same stream in different goroutines. It is also
132 // not safe to call CloseSend concurrently with SendMsg.
133 //
134 // It is not safe to modify the message after calling SendMsg. Tracing
135 // libraries and stats handlers may use the message lazily.
136 SendMsg(m any) error
137 // RecvMsg blocks until it receives a message into m or the stream is
138 // done. It returns io.EOF when the stream completes successfully. On
139 // any other error, the stream is aborted and the error contains the RPC
140 // status.
141 //
142 // It is safe to have a goroutine calling SendMsg and another goroutine
143 // calling RecvMsg on the same stream at the same time, but it is not
144 // safe to call RecvMsg on the same stream in different goroutines.
145 RecvMsg(m any) error
146}
147
148// NewStream creates a new Stream for the client side. This is typically
149// called by generated code. ctx is used for the lifetime of the stream.
150//
151// To ensure resources are not leaked due to the stream returned, one of the following
152// actions must be performed:
153//
154// 1. Call Close on the ClientConn.
155// 2. Cancel the context provided.
156// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
157// client-streaming RPC, for instance, might use the helper function
158// CloseAndRecv (note that CloseSend does not Recv, therefore is not
159// guaranteed to release all resources).
160// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
161//
162// If none of the above happen, a goroutine and a context will be leaked, and grpc
163// will not call the optionally-configured stats handler with a stats.End message.
164func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
165 // allow interceptor to see all applicable call options, which means those
166 // configured as defaults from dial option as well as per-call options
167 opts = combine(cc.dopts.callOptions, opts)
168
169 if cc.dopts.streamInt != nil {
170 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
171 }
172 return newClientStream(ctx, desc, cc, method, opts...)
173}
174
175// NewClientStream is a wrapper for ClientConn.NewStream.
176func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
177 return cc.NewStream(ctx, desc, method, opts...)
178}
179
180func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
181 // Start tracking the RPC for idleness purposes. This is where a stream is
182 // created for both streaming and unary RPCs, and hence is a good place to
183 // track active RPC count.
184 if err := cc.idlenessMgr.OnCallBegin(); err != nil {
185 return nil, err
186 }
187 // Add a calloption, to decrement the active call count, that gets executed
188 // when the RPC completes.
189 opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
190
191 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
192 // validate md
193 if err := imetadata.Validate(md); err != nil {
194 return nil, status.Error(codes.Internal, err.Error())
195 }
196 // validate added
197 for _, kvs := range added {
198 for i := 0; i < len(kvs); i += 2 {
199 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
200 return nil, status.Error(codes.Internal, err.Error())
201 }
202 }
203 }
204 }
205 if channelz.IsOn() {
206 cc.incrCallsStarted()
207 defer func() {
208 if err != nil {
209 cc.incrCallsFailed()
210 }
211 }()
212 }
213 // Provide an opportunity for the first RPC to see the first service config
214 // provided by the resolver.
215 if err := cc.waitForResolvedAddrs(ctx); err != nil {
216 return nil, err
217 }
218
219 var mc serviceconfig.MethodConfig
220 var onCommit func()
221 newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
222 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
223 }
224
225 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
226 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
227 if err != nil {
228 if st, ok := status.FromError(err); ok {
229 // Restrict the code to the list allowed by gRFC A54.
230 if istatus.IsRestrictedControlPlaneCode(st) {
231 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
232 }
233 return nil, err
234 }
235 return nil, toRPCErr(err)
236 }
237
238 if rpcConfig != nil {
239 if rpcConfig.Context != nil {
240 ctx = rpcConfig.Context
241 }
242 mc = rpcConfig.MethodConfig
243 onCommit = rpcConfig.OnCommitted
244 if rpcConfig.Interceptor != nil {
245 rpcInfo.Context = nil
246 ns := newStream
247 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
248 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
249 if err != nil {
250 return nil, toRPCErr(err)
251 }
252 return cs, nil
253 }
254 }
255 }
256
257 return newStream(ctx, func() {})
258}
259
260func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
261 callInfo := defaultCallInfo()
262 if mc.WaitForReady != nil {
263 callInfo.failFast = !*mc.WaitForReady
264 }
265
266 // Possible context leak:
267 // The cancel function for the child context we create will only be called
268 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
269 // an error is generated by SendMsg.
270 // https://github.com/grpc/grpc-go/issues/1818.
271 var cancel context.CancelFunc
272 if mc.Timeout != nil && *mc.Timeout >= 0 {
273 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
274 } else {
275 ctx, cancel = context.WithCancel(ctx)
276 }
277 defer func() {
278 if err != nil {
279 cancel()
280 }
281 }()
282
283 for _, o := range opts {
284 if err := o.before(callInfo); err != nil {
285 return nil, toRPCErr(err)
286 }
287 }
288 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
289 callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
290 if err := setCallInfoCodec(callInfo); err != nil {
291 return nil, err
292 }
293
294 callHdr := &transport.CallHdr{
295 Host: cc.authority,
296 Method: method,
297 ContentSubtype: callInfo.contentSubtype,
298 DoneFunc: doneFunc,
299 }
300
301 // Set our outgoing compression according to the UseCompressor CallOption, if
302 // set. In that case, also find the compressor from the encoding package.
303 // Otherwise, use the compressor configured by the WithCompressor DialOption,
304 // if set.
305 var compressorV0 Compressor
306 var compressorV1 encoding.Compressor
307 if ct := callInfo.compressorName; ct != "" {
308 callHdr.SendCompress = ct
309 if ct != encoding.Identity {
310 compressorV1 = encoding.GetCompressor(ct)
311 if compressorV1 == nil {
312 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
313 }
314 }
315 } else if cc.dopts.compressorV0 != nil {
316 callHdr.SendCompress = cc.dopts.compressorV0.Type()
317 compressorV0 = cc.dopts.compressorV0
318 }
319 if callInfo.creds != nil {
320 callHdr.Creds = callInfo.creds
321 }
322
323 cs := &clientStream{
324 callHdr: callHdr,
325 ctx: ctx,
326 methodConfig: &mc,
327 opts: opts,
328 callInfo: callInfo,
329 cc: cc,
330 desc: desc,
331 codec: callInfo.codec,
332 compressorV0: compressorV0,
333 compressorV1: compressorV1,
334 cancel: cancel,
335 firstAttempt: true,
336 onCommit: onCommit,
337 }
338 if !cc.dopts.disableRetry {
339 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
340 }
341 if ml := binarylog.GetMethodLogger(method); ml != nil {
342 cs.binlogs = append(cs.binlogs, ml)
343 }
344 if cc.dopts.binaryLogger != nil {
345 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
346 cs.binlogs = append(cs.binlogs, ml)
347 }
348 }
349
350 // Pick the transport to use and create a new stream on the transport.
351 // Assign cs.attempt upon success.
352 op := func(a *csAttempt) error {
353 if err := a.getTransport(); err != nil {
354 return err
355 }
356 if err := a.newStream(); err != nil {
357 return err
358 }
359 // Because this operation is always called either here (while creating
360 // the clientStream) or by the retry code while locked when replaying
361 // the operation, it is safe to access cs.attempt directly.
362 cs.attempt = a
363 return nil
364 }
365 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
366 return nil, err
367 }
368
369 if len(cs.binlogs) != 0 {
370 md, _ := metadata.FromOutgoingContext(ctx)
371 logEntry := &binarylog.ClientHeader{
372 OnClientSide: true,
373 Header: md,
374 MethodName: method,
375 Authority: cs.cc.authority,
376 }
377 if deadline, ok := ctx.Deadline(); ok {
378 logEntry.Timeout = time.Until(deadline)
379 if logEntry.Timeout < 0 {
380 logEntry.Timeout = 0
381 }
382 }
383 for _, binlog := range cs.binlogs {
384 binlog.Log(cs.ctx, logEntry)
385 }
386 }
387
388 if desc != unaryStreamDesc {
389 // Listen on cc and stream contexts to cleanup when the user closes the
390 // ClientConn or cancels the stream context. In all other cases, an error
391 // should already be injected into the recv buffer by the transport, which
392 // the client will eventually receive, and then we will cancel the stream's
393 // context in clientStream.finish.
394 go func() {
395 select {
396 case <-cc.ctx.Done():
397 cs.finish(ErrClientConnClosing)
398 case <-ctx.Done():
399 cs.finish(toRPCErr(ctx.Err()))
400 }
401 }()
402 }
403 return cs, nil
404}
405
406// newAttemptLocked creates a new csAttempt without a transport or stream.
407func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
408 if err := cs.ctx.Err(); err != nil {
409 return nil, toRPCErr(err)
410 }
411 if err := cs.cc.ctx.Err(); err != nil {
412 return nil, ErrClientConnClosing
413 }
414
415 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
416 method := cs.callHdr.Method
417 var beginTime time.Time
418 shs := cs.cc.dopts.copts.StatsHandlers
419 for _, sh := range shs {
420 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
421 beginTime = time.Now()
422 begin := &stats.Begin{
423 Client: true,
424 BeginTime: beginTime,
425 FailFast: cs.callInfo.failFast,
426 IsClientStream: cs.desc.ClientStreams,
427 IsServerStream: cs.desc.ServerStreams,
428 IsTransparentRetryAttempt: isTransparent,
429 }
430 sh.HandleRPC(ctx, begin)
431 }
432
433 var trInfo *traceInfo
434 if EnableTracing {
435 trInfo = &traceInfo{
436 tr: newTrace("grpc.Sent."+methodFamily(method), method),
437 firstLine: firstLine{
438 client: true,
439 },
440 }
441 if deadline, ok := ctx.Deadline(); ok {
442 trInfo.firstLine.deadline = time.Until(deadline)
443 }
444 trInfo.tr.LazyLog(&trInfo.firstLine, false)
445 ctx = newTraceContext(ctx, trInfo.tr)
446 }
447
448 if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
449 // Add extra metadata (metadata that will be added by transport) to context
450 // so the balancer can see them.
451 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
452 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
453 ))
454 }
455
456 return &csAttempt{
457 ctx: ctx,
458 beginTime: beginTime,
459 cs: cs,
460 decompressorV0: cs.cc.dopts.dc,
461 statsHandlers: shs,
462 trInfo: trInfo,
463 }, nil
464}
465
466func (a *csAttempt) getTransport() error {
467 cs := a.cs
468
469 var err error
470 a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
471 if err != nil {
472 if de, ok := err.(dropError); ok {
473 err = de.error
474 a.drop = true
475 }
476 return err
477 }
478 if a.trInfo != nil {
479 a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
480 }
481 return nil
482}
483
484func (a *csAttempt) newStream() error {
485 cs := a.cs
486 cs.callHdr.PreviousAttempts = cs.numRetries
487
488 // Merge metadata stored in PickResult, if any, with existing call metadata.
489 // It is safe to overwrite the csAttempt's context here, since all state
490 // maintained in it are local to the attempt. When the attempt has to be
491 // retried, a new instance of csAttempt will be created.
492 if a.pickResult.Metadata != nil {
493 // We currently do not have a function it the metadata package which
494 // merges given metadata with existing metadata in a context. Existing
495 // function `AppendToOutgoingContext()` takes a variadic argument of key
496 // value pairs.
497 //
498 // TODO: Make it possible to retrieve key value pairs from metadata.MD
499 // in a form passable to AppendToOutgoingContext(), or create a version
500 // of AppendToOutgoingContext() that accepts a metadata.MD.
501 md, _ := metadata.FromOutgoingContext(a.ctx)
502 md = metadata.Join(md, a.pickResult.Metadata)
503 a.ctx = metadata.NewOutgoingContext(a.ctx, md)
504 }
505
506 s, err := a.transport.NewStream(a.ctx, cs.callHdr)
507 if err != nil {
508 nse, ok := err.(*transport.NewStreamError)
509 if !ok {
510 // Unexpected.
511 return err
512 }
513
514 if nse.AllowTransparentRetry {
515 a.allowTransparentRetry = true
516 }
517
518 // Unwrap and convert error.
519 return toRPCErr(nse.Err)
520 }
521 a.transportStream = s
522 a.ctx = s.Context()
523 a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
524 return nil
525}
526
527// clientStream implements a client side Stream.
528type clientStream struct {
529 callHdr *transport.CallHdr
530 opts []CallOption
531 callInfo *callInfo
532 cc *ClientConn
533 desc *StreamDesc
534
535 codec baseCodec
536 compressorV0 Compressor
537 compressorV1 encoding.Compressor
538
539 cancel context.CancelFunc // cancels all attempts
540
541 sentLast bool // sent an end stream
542
543 methodConfig *MethodConfig
544
545 ctx context.Context // the application's context, wrapped by stats/tracing
546
547 retryThrottler *retryThrottler // The throttler active when the RPC began.
548
549 binlogs []binarylog.MethodLogger
550 // serverHeaderBinlogged is a boolean for whether server header has been
551 // logged. Server header will be logged when the first time one of those
552 // happens: stream.Header(), stream.Recv().
553 //
554 // It's only read and used by Recv() and Header(), so it doesn't need to be
555 // synchronized.
556 serverHeaderBinlogged bool
557
558 mu sync.Mutex
559 firstAttempt bool // if true, transparent retry is valid
560 numRetries int // exclusive of transparent retry attempt(s)
561 numRetriesSincePushback int // retries since pushback; to reset backoff
562 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
563 // attempt is the active client stream attempt.
564 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
565 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
566 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
567 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
568 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
569 // place where we need to check if the attempt is nil.
570 attempt *csAttempt
571 // TODO(hedging): hedging will have multiple attempts simultaneously.
572 committed bool // active attempt committed for retry?
573 onCommit func()
574 replayBuffer []replayOp // operations to replay on retry
575 replayBufferSize int // current size of replayBuffer
576}
577
578type replayOp struct {
579 op func(a *csAttempt) error
580 cleanup func()
581}
582
583// csAttempt implements a single transport stream attempt within a
584// clientStream.
585type csAttempt struct {
586 ctx context.Context
587 cs *clientStream
588 transport transport.ClientTransport
589 transportStream *transport.ClientStream
590 parser *parser
591 pickResult balancer.PickResult
592
593 finished bool
594 decompressorV0 Decompressor
595 decompressorV1 encoding.Compressor
596 decompressorSet bool
597
598 mu sync.Mutex // guards trInfo.tr
599 // trInfo may be nil (if EnableTracing is false).
600 // trInfo.tr is set when created (if EnableTracing is true),
601 // and cleared when the finish method is called.
602 trInfo *traceInfo
603
604 statsHandlers []stats.Handler
605 beginTime time.Time
606
607 // set for newStream errors that may be transparently retried
608 allowTransparentRetry bool
609 // set for pick errors that are returned as a status
610 drop bool
611}
612
613func (cs *clientStream) commitAttemptLocked() {
614 if !cs.committed && cs.onCommit != nil {
615 cs.onCommit()
616 }
617 cs.committed = true
618 for _, op := range cs.replayBuffer {
619 if op.cleanup != nil {
620 op.cleanup()
621 }
622 }
623 cs.replayBuffer = nil
624}
625
626func (cs *clientStream) commitAttempt() {
627 cs.mu.Lock()
628 cs.commitAttemptLocked()
629 cs.mu.Unlock()
630}
631
632// shouldRetry returns nil if the RPC should be retried; otherwise it returns
633// the error that should be returned by the operation. If the RPC should be
634// retried, the bool indicates whether it is being retried transparently.
635func (a *csAttempt) shouldRetry(err error) (bool, error) {
636 cs := a.cs
637
638 if cs.finished || cs.committed || a.drop {
639 // RPC is finished or committed or was dropped by the picker; cannot retry.
640 return false, err
641 }
642 if a.transportStream == nil && a.allowTransparentRetry {
643 return true, nil
644 }
645 // Wait for the trailers.
646 unprocessed := false
647 if a.transportStream != nil {
648 <-a.transportStream.Done()
649 unprocessed = a.transportStream.Unprocessed()
650 }
651 if cs.firstAttempt && unprocessed {
652 // First attempt, stream unprocessed: transparently retry.
653 return true, nil
654 }
655 if cs.cc.dopts.disableRetry {
656 return false, err
657 }
658
659 pushback := 0
660 hasPushback := false
661 if a.transportStream != nil {
662 if !a.transportStream.TrailersOnly() {
663 return false, err
664 }
665
666 // TODO(retry): Move down if the spec changes to not check server pushback
667 // before considering this a failure for throttling.
668 sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
669 if len(sps) == 1 {
670 var e error
671 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
672 channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
673 cs.retryThrottler.throttle() // This counts as a failure for throttling.
674 return false, err
675 }
676 hasPushback = true
677 } else if len(sps) > 1 {
678 channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
679 cs.retryThrottler.throttle() // This counts as a failure for throttling.
680 return false, err
681 }
682 }
683
684 var code codes.Code
685 if a.transportStream != nil {
686 code = a.transportStream.Status().Code()
687 } else {
688 code = status.Code(err)
689 }
690
691 rp := cs.methodConfig.RetryPolicy
692 if rp == nil || !rp.RetryableStatusCodes[code] {
693 return false, err
694 }
695
696 // Note: the ordering here is important; we count this as a failure
697 // only if the code matched a retryable code.
698 if cs.retryThrottler.throttle() {
699 return false, err
700 }
701 if cs.numRetries+1 >= rp.MaxAttempts {
702 return false, err
703 }
704
705 var dur time.Duration
706 if hasPushback {
707 dur = time.Millisecond * time.Duration(pushback)
708 cs.numRetriesSincePushback = 0
709 } else {
710 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
711 cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
712 // Apply jitter by multiplying with a random factor between 0.8 and 1.2
713 cur *= 0.8 + 0.4*rand.Float64()
714 dur = time.Duration(int64(cur))
715 cs.numRetriesSincePushback++
716 }
717
718 // TODO(dfawley): we could eagerly fail here if dur puts us past the
719 // deadline, but unsure if it is worth doing.
720 t := time.NewTimer(dur)
721 select {
722 case <-t.C:
723 cs.numRetries++
724 return false, nil
725 case <-cs.ctx.Done():
726 t.Stop()
727 return false, status.FromContextError(cs.ctx.Err()).Err()
728 }
729}
730
731// Returns nil if a retry was performed and succeeded; error otherwise.
732func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
733 for {
734 attempt.finish(toRPCErr(lastErr))
735 isTransparent, err := attempt.shouldRetry(lastErr)
736 if err != nil {
737 cs.commitAttemptLocked()
738 return err
739 }
740 cs.firstAttempt = false
741 attempt, err = cs.newAttemptLocked(isTransparent)
742 if err != nil {
743 // Only returns error if the clientconn is closed or the context of
744 // the stream is canceled.
745 return err
746 }
747 // Note that the first op in replayBuffer always sets cs.attempt
748 // if it is able to pick a transport and create a stream.
749 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
750 return nil
751 }
752 }
753}
754
755func (cs *clientStream) Context() context.Context {
756 cs.commitAttempt()
757 // No need to lock before using attempt, since we know it is committed and
758 // cannot change.
759 if cs.attempt.transportStream != nil {
760 return cs.attempt.transportStream.Context()
761 }
762 return cs.ctx
763}
764
765func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
766 cs.mu.Lock()
767 for {
768 if cs.committed {
769 cs.mu.Unlock()
770 // toRPCErr is used in case the error from the attempt comes from
771 // NewClientStream, which intentionally doesn't return a status
772 // error to allow for further inspection; all other errors should
773 // already be status errors.
774 return toRPCErr(op(cs.attempt))
775 }
776 if len(cs.replayBuffer) == 0 {
777 // For the first op, which controls creation of the stream and
778 // assigns cs.attempt, we need to create a new attempt inline
779 // before executing the first op. On subsequent ops, the attempt
780 // is created immediately before replaying the ops.
781 var err error
782 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
783 cs.mu.Unlock()
784 cs.finish(err)
785 return err
786 }
787 }
788 a := cs.attempt
789 cs.mu.Unlock()
790 err := op(a)
791 cs.mu.Lock()
792 if a != cs.attempt {
793 // We started another attempt already.
794 continue
795 }
796 if err == io.EOF {
797 <-a.transportStream.Done()
798 }
799 if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
800 onSuccess()
801 cs.mu.Unlock()
802 return err
803 }
804 if err := cs.retryLocked(a, err); err != nil {
805 cs.mu.Unlock()
806 return err
807 }
808 }
809}
810
811func (cs *clientStream) Header() (metadata.MD, error) {
812 var m metadata.MD
813 err := cs.withRetry(func(a *csAttempt) error {
814 var err error
815 m, err = a.transportStream.Header()
816 return toRPCErr(err)
817 }, cs.commitAttemptLocked)
818
819 if m == nil && err == nil {
820 // The stream ended with success. Finish the clientStream.
821 err = io.EOF
822 }
823
824 if err != nil {
825 cs.finish(err)
826 // Do not return the error. The user should get it by calling Recv().
827 return nil, nil
828 }
829
830 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
831 // Only log if binary log is on and header has not been logged, and
832 // there is actually headers to log.
833 logEntry := &binarylog.ServerHeader{
834 OnClientSide: true,
835 Header: m,
836 PeerAddr: nil,
837 }
838 if peer, ok := peer.FromContext(cs.Context()); ok {
839 logEntry.PeerAddr = peer.Addr
840 }
841 cs.serverHeaderBinlogged = true
842 for _, binlog := range cs.binlogs {
843 binlog.Log(cs.ctx, logEntry)
844 }
845 }
846
847 return m, nil
848}
849
850func (cs *clientStream) Trailer() metadata.MD {
851 // On RPC failure, we never need to retry, because usage requires that
852 // RecvMsg() returned a non-nil error before calling this function is valid.
853 // We would have retried earlier if necessary.
854 //
855 // Commit the attempt anyway, just in case users are not following those
856 // directions -- it will prevent races and should not meaningfully impact
857 // performance.
858 cs.commitAttempt()
859 if cs.attempt.transportStream == nil {
860 return nil
861 }
862 return cs.attempt.transportStream.Trailer()
863}
864
865func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
866 for _, f := range cs.replayBuffer {
867 if err := f.op(attempt); err != nil {
868 return err
869 }
870 }
871 return nil
872}
873
874func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
875 // Note: we still will buffer if retry is disabled (for transparent retries).
876 if cs.committed {
877 return
878 }
879 cs.replayBufferSize += sz
880 if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
881 cs.commitAttemptLocked()
882 cleanup()
883 return
884 }
885 cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
886}
887
888func (cs *clientStream) SendMsg(m any) (err error) {
889 defer func() {
890 if err != nil && err != io.EOF {
891 // Call finish on the client stream for errors generated by this SendMsg
892 // call, as these indicate problems created by this client. (Transport
893 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
894 // error will be returned from RecvMsg eventually in that case, or be
895 // retried.)
896 cs.finish(err)
897 }
898 }()
899 if cs.sentLast {
900 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
901 }
902 if !cs.desc.ClientStreams {
903 cs.sentLast = true
904 }
905
906 // load hdr, payload, data
907 hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
908 if err != nil {
909 return err
910 }
911
912 defer func() {
913 data.Free()
914 // only free payload if compression was made, and therefore it is a different set
915 // of buffers from data.
916 if pf.isCompressed() {
917 payload.Free()
918 }
919 }()
920
921 dataLen := data.Len()
922 payloadLen := payload.Len()
923 // TODO(dfawley): should we be checking len(data) instead?
924 if payloadLen > *cs.callInfo.maxSendMessageSize {
925 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
926 }
927
928 // always take an extra ref in case data == payload (i.e. when the data isn't
929 // compressed). The original ref will always be freed by the deferred free above.
930 payload.Ref()
931 op := func(a *csAttempt) error {
932 return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
933 }
934
935 // onSuccess is invoked when the op is captured for a subsequent retry. If the
936 // stream was established by a previous message and therefore retries are
937 // disabled, onSuccess will not be invoked, and payloadRef can be freed
938 // immediately.
939 onSuccessCalled := false
940 err = cs.withRetry(op, func() {
941 cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
942 onSuccessCalled = true
943 })
944 if !onSuccessCalled {
945 payload.Free()
946 }
947 if len(cs.binlogs) != 0 && err == nil {
948 cm := &binarylog.ClientMessage{
949 OnClientSide: true,
950 Message: data.Materialize(),
951 }
952 for _, binlog := range cs.binlogs {
953 binlog.Log(cs.ctx, cm)
954 }
955 }
956 return err
957}
958
959func (cs *clientStream) RecvMsg(m any) error {
960 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
961 // Call Header() to binary log header if it's not already logged.
962 cs.Header()
963 }
964 var recvInfo *payloadInfo
965 if len(cs.binlogs) != 0 {
966 recvInfo = &payloadInfo{}
967 defer recvInfo.free()
968 }
969 err := cs.withRetry(func(a *csAttempt) error {
970 return a.recvMsg(m, recvInfo)
971 }, cs.commitAttemptLocked)
972 if len(cs.binlogs) != 0 && err == nil {
973 sm := &binarylog.ServerMessage{
974 OnClientSide: true,
975 Message: recvInfo.uncompressedBytes.Materialize(),
976 }
977 for _, binlog := range cs.binlogs {
978 binlog.Log(cs.ctx, sm)
979 }
980 }
981 if err != nil || !cs.desc.ServerStreams {
982 // err != nil or non-server-streaming indicates end of stream.
983 cs.finish(err)
984 }
985 return err
986}
987
988func (cs *clientStream) CloseSend() error {
989 if cs.sentLast {
990 // TODO: return an error and finish the stream instead, due to API misuse?
991 return nil
992 }
993 cs.sentLast = true
994 op := func(a *csAttempt) error {
995 a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
996 // Always return nil; io.EOF is the only error that might make sense
997 // instead, but there is no need to signal the client to call RecvMsg
998 // as the only use left for the stream after CloseSend is to call
999 // RecvMsg. This also matches historical behavior.
1000 return nil
1001 }
1002 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
1003 if len(cs.binlogs) != 0 {
1004 chc := &binarylog.ClientHalfClose{
1005 OnClientSide: true,
1006 }
1007 for _, binlog := range cs.binlogs {
1008 binlog.Log(cs.ctx, chc)
1009 }
1010 }
1011 // We never returned an error here for reasons.
1012 return nil
1013}
1014
1015func (cs *clientStream) finish(err error) {
1016 if err == io.EOF {
1017 // Ending a stream with EOF indicates a success.
1018 err = nil
1019 }
1020 cs.mu.Lock()
1021 if cs.finished {
1022 cs.mu.Unlock()
1023 return
1024 }
1025 cs.finished = true
1026 for _, onFinish := range cs.callInfo.onFinish {
1027 onFinish(err)
1028 }
1029 cs.commitAttemptLocked()
1030 if cs.attempt != nil {
1031 cs.attempt.finish(err)
1032 // after functions all rely upon having a stream.
1033 if cs.attempt.transportStream != nil {
1034 for _, o := range cs.opts {
1035 o.after(cs.callInfo, cs.attempt)
1036 }
1037 }
1038 }
1039
1040 cs.mu.Unlock()
1041 // Only one of cancel or trailer needs to be logged.
1042 if len(cs.binlogs) != 0 {
1043 switch err {
1044 case errContextCanceled, errContextDeadline, ErrClientConnClosing:
1045 c := &binarylog.Cancel{
1046 OnClientSide: true,
1047 }
1048 for _, binlog := range cs.binlogs {
1049 binlog.Log(cs.ctx, c)
1050 }
1051 default:
1052 logEntry := &binarylog.ServerTrailer{
1053 OnClientSide: true,
1054 Trailer: cs.Trailer(),
1055 Err: err,
1056 }
1057 if peer, ok := peer.FromContext(cs.Context()); ok {
1058 logEntry.PeerAddr = peer.Addr
1059 }
1060 for _, binlog := range cs.binlogs {
1061 binlog.Log(cs.ctx, logEntry)
1062 }
1063 }
1064 }
1065 if err == nil {
1066 cs.retryThrottler.successfulRPC()
1067 }
1068 if channelz.IsOn() {
1069 if err != nil {
1070 cs.cc.incrCallsFailed()
1071 } else {
1072 cs.cc.incrCallsSucceeded()
1073 }
1074 }
1075 cs.cancel()
1076}
1077
1078func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
1079 cs := a.cs
1080 if a.trInfo != nil {
1081 a.mu.Lock()
1082 if a.trInfo.tr != nil {
1083 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1084 }
1085 a.mu.Unlock()
1086 }
1087 if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
1088 if !cs.desc.ClientStreams {
1089 // For non-client-streaming RPCs, we return nil instead of EOF on error
1090 // because the generated code requires it. finish is not called; RecvMsg()
1091 // will call it with the stream's status independently.
1092 return nil
1093 }
1094 return io.EOF
1095 }
1096 if len(a.statsHandlers) != 0 {
1097 for _, sh := range a.statsHandlers {
1098 sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
1099 }
1100 }
1101 return nil
1102}
1103
1104func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
1105 cs := a.cs
1106 if len(a.statsHandlers) != 0 && payInfo == nil {
1107 payInfo = &payloadInfo{}
1108 defer payInfo.free()
1109 }
1110
1111 if !a.decompressorSet {
1112 // Block until we receive headers containing received message encoding.
1113 if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1114 if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
1115 // No configured decompressor, or it does not match the incoming
1116 // message encoding; attempt to find a registered compressor that does.
1117 a.decompressorV0 = nil
1118 a.decompressorV1 = encoding.GetCompressor(ct)
1119 }
1120 } else {
1121 // No compression is used; disable our decompressor.
1122 a.decompressorV0 = nil
1123 }
1124 // Only initialize this state once per stream.
1125 a.decompressorSet = true
1126 }
1127 if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
1128 if err == io.EOF {
1129 if statusErr := a.transportStream.Status().Err(); statusErr != nil {
1130 return statusErr
1131 }
1132 return io.EOF // indicates successful end of stream.
1133 }
1134
1135 return toRPCErr(err)
1136 }
1137 if a.trInfo != nil {
1138 a.mu.Lock()
1139 if a.trInfo.tr != nil {
1140 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1141 }
1142 a.mu.Unlock()
1143 }
1144 for _, sh := range a.statsHandlers {
1145 sh.HandleRPC(a.ctx, &stats.InPayload{
1146 Client: true,
1147 RecvTime: time.Now(),
1148 Payload: m,
1149 WireLength: payInfo.compressedLength + headerLen,
1150 CompressedLength: payInfo.compressedLength,
1151 Length: payInfo.uncompressedBytes.Len(),
1152 })
1153 }
1154 if cs.desc.ServerStreams {
1155 // Subsequent messages should be received by subsequent RecvMsg calls.
1156 return nil
1157 }
1158 // Special handling for non-server-stream rpcs.
1159 // This recv expects EOF or errors, so we don't collect inPayload.
1160 if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
1161 return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1162 } else if err != nil {
1163 return toRPCErr(err)
1164 }
1165 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1166}
1167
1168func (a *csAttempt) finish(err error) {
1169 a.mu.Lock()
1170 if a.finished {
1171 a.mu.Unlock()
1172 return
1173 }
1174 a.finished = true
1175 if err == io.EOF {
1176 // Ending a stream with EOF indicates a success.
1177 err = nil
1178 }
1179 var tr metadata.MD
1180 if a.transportStream != nil {
1181 a.transportStream.Close(err)
1182 tr = a.transportStream.Trailer()
1183 }
1184
1185 if a.pickResult.Done != nil {
1186 br := false
1187 if a.transportStream != nil {
1188 br = a.transportStream.BytesReceived()
1189 }
1190 a.pickResult.Done(balancer.DoneInfo{
1191 Err: err,
1192 Trailer: tr,
1193 BytesSent: a.transportStream != nil,
1194 BytesReceived: br,
1195 ServerLoad: balancerload.Parse(tr),
1196 })
1197 }
1198 for _, sh := range a.statsHandlers {
1199 end := &stats.End{
1200 Client: true,
1201 BeginTime: a.beginTime,
1202 EndTime: time.Now(),
1203 Trailer: tr,
1204 Error: err,
1205 }
1206 sh.HandleRPC(a.ctx, end)
1207 }
1208 if a.trInfo != nil && a.trInfo.tr != nil {
1209 if err == nil {
1210 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1211 } else {
1212 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1213 a.trInfo.tr.SetError()
1214 }
1215 a.trInfo.tr.Finish()
1216 a.trInfo.tr = nil
1217 }
1218 a.mu.Unlock()
1219}
1220
1221// newNonRetryClientStream creates a ClientStream with the specified transport, on the
1222// given addrConn.
1223//
1224// It's expected that the given transport is either the same one in addrConn, or
1225// is already closed. To avoid race, transport is specified separately, instead
1226// of using ac.transport.
1227//
1228// Main difference between this and ClientConn.NewStream:
1229// - no retry
1230// - no service config (or wait for service config)
1231// - no tracing or stats
1232func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1233 if t == nil {
1234 // TODO: return RPC error here?
1235 return nil, errors.New("transport provided is nil")
1236 }
1237 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1238 c := &callInfo{}
1239
1240 // Possible context leak:
1241 // The cancel function for the child context we create will only be called
1242 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1243 // an error is generated by SendMsg.
1244 // https://github.com/grpc/grpc-go/issues/1818.
1245 ctx, cancel := context.WithCancel(ctx)
1246 defer func() {
1247 if err != nil {
1248 cancel()
1249 }
1250 }()
1251
1252 for _, o := range opts {
1253 if err := o.before(c); err != nil {
1254 return nil, toRPCErr(err)
1255 }
1256 }
1257 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1258 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1259 if err := setCallInfoCodec(c); err != nil {
1260 return nil, err
1261 }
1262
1263 callHdr := &transport.CallHdr{
1264 Host: ac.cc.authority,
1265 Method: method,
1266 ContentSubtype: c.contentSubtype,
1267 }
1268
1269 // Set our outgoing compression according to the UseCompressor CallOption, if
1270 // set. In that case, also find the compressor from the encoding package.
1271 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1272 // if set.
1273 var cp Compressor
1274 var comp encoding.Compressor
1275 if ct := c.compressorName; ct != "" {
1276 callHdr.SendCompress = ct
1277 if ct != encoding.Identity {
1278 comp = encoding.GetCompressor(ct)
1279 if comp == nil {
1280 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1281 }
1282 }
1283 } else if ac.cc.dopts.compressorV0 != nil {
1284 callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1285 cp = ac.cc.dopts.compressorV0
1286 }
1287 if c.creds != nil {
1288 callHdr.Creds = c.creds
1289 }
1290
1291 // Use a special addrConnStream to avoid retry.
1292 as := &addrConnStream{
1293 callHdr: callHdr,
1294 ac: ac,
1295 ctx: ctx,
1296 cancel: cancel,
1297 opts: opts,
1298 callInfo: c,
1299 desc: desc,
1300 codec: c.codec,
1301 sendCompressorV0: cp,
1302 sendCompressorV1: comp,
1303 transport: t,
1304 }
1305
1306 s, err := as.transport.NewStream(as.ctx, as.callHdr)
1307 if err != nil {
1308 err = toRPCErr(err)
1309 return nil, err
1310 }
1311 as.transportStream = s
1312 as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
1313 ac.incrCallsStarted()
1314 if desc != unaryStreamDesc {
1315 // Listen on stream context to cleanup when the stream context is
1316 // canceled. Also listen for the addrConn's context in case the
1317 // addrConn is closed or reconnects to a different address. In all
1318 // other cases, an error should already be injected into the recv
1319 // buffer by the transport, which the client will eventually receive,
1320 // and then we will cancel the stream's context in
1321 // addrConnStream.finish.
1322 go func() {
1323 ac.mu.Lock()
1324 acCtx := ac.ctx
1325 ac.mu.Unlock()
1326 select {
1327 case <-acCtx.Done():
1328 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1329 case <-ctx.Done():
1330 as.finish(toRPCErr(ctx.Err()))
1331 }
1332 }()
1333 }
1334 return as, nil
1335}
1336
1337type addrConnStream struct {
1338 transportStream *transport.ClientStream
1339 ac *addrConn
1340 callHdr *transport.CallHdr
1341 cancel context.CancelFunc
1342 opts []CallOption
1343 callInfo *callInfo
1344 transport transport.ClientTransport
1345 ctx context.Context
1346 sentLast bool
1347 desc *StreamDesc
1348 codec baseCodec
1349 sendCompressorV0 Compressor
1350 sendCompressorV1 encoding.Compressor
1351 decompressorSet bool
1352 decompressorV0 Decompressor
1353 decompressorV1 encoding.Compressor
1354 parser *parser
1355
1356 // mu guards finished and is held for the entire finish method.
1357 mu sync.Mutex
1358 finished bool
1359}
1360
1361func (as *addrConnStream) Header() (metadata.MD, error) {
1362 m, err := as.transportStream.Header()
1363 if err != nil {
1364 as.finish(toRPCErr(err))
1365 }
1366 return m, err
1367}
1368
1369func (as *addrConnStream) Trailer() metadata.MD {
1370 return as.transportStream.Trailer()
1371}
1372
1373func (as *addrConnStream) CloseSend() error {
1374 if as.sentLast {
1375 // TODO: return an error and finish the stream instead, due to API misuse?
1376 return nil
1377 }
1378 as.sentLast = true
1379
1380 as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
1381 // Always return nil; io.EOF is the only error that might make sense
1382 // instead, but there is no need to signal the client to call RecvMsg
1383 // as the only use left for the stream after CloseSend is to call
1384 // RecvMsg. This also matches historical behavior.
1385 return nil
1386}
1387
1388func (as *addrConnStream) Context() context.Context {
1389 return as.transportStream.Context()
1390}
1391
1392func (as *addrConnStream) SendMsg(m any) (err error) {
1393 defer func() {
1394 if err != nil && err != io.EOF {
1395 // Call finish on the client stream for errors generated by this SendMsg
1396 // call, as these indicate problems created by this client. (Transport
1397 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1398 // error will be returned from RecvMsg eventually in that case, or be
1399 // retried.)
1400 as.finish(err)
1401 }
1402 }()
1403 if as.sentLast {
1404 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1405 }
1406 if !as.desc.ClientStreams {
1407 as.sentLast = true
1408 }
1409
1410 // load hdr, payload, data
1411 hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
1412 if err != nil {
1413 return err
1414 }
1415
1416 defer func() {
1417 data.Free()
1418 // only free payload if compression was made, and therefore it is a different set
1419 // of buffers from data.
1420 if pf.isCompressed() {
1421 payload.Free()
1422 }
1423 }()
1424
1425 // TODO(dfawley): should we be checking len(data) instead?
1426 if payload.Len() > *as.callInfo.maxSendMessageSize {
1427 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
1428 }
1429
1430 if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
1431 if !as.desc.ClientStreams {
1432 // For non-client-streaming RPCs, we return nil instead of EOF on error
1433 // because the generated code requires it. finish is not called; RecvMsg()
1434 // will call it with the stream's status independently.
1435 return nil
1436 }
1437 return io.EOF
1438 }
1439
1440 return nil
1441}
1442
1443func (as *addrConnStream) RecvMsg(m any) (err error) {
1444 defer func() {
1445 if err != nil || !as.desc.ServerStreams {
1446 // err != nil or non-server-streaming indicates end of stream.
1447 as.finish(err)
1448 }
1449 }()
1450
1451 if !as.decompressorSet {
1452 // Block until we receive headers containing received message encoding.
1453 if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1454 if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
1455 // No configured decompressor, or it does not match the incoming
1456 // message encoding; attempt to find a registered compressor that does.
1457 as.decompressorV0 = nil
1458 as.decompressorV1 = encoding.GetCompressor(ct)
1459 }
1460 } else {
1461 // No compression is used; disable our decompressor.
1462 as.decompressorV0 = nil
1463 }
1464 // Only initialize this state once per stream.
1465 as.decompressorSet = true
1466 }
1467 if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
1468 if err == io.EOF {
1469 if statusErr := as.transportStream.Status().Err(); statusErr != nil {
1470 return statusErr
1471 }
1472 return io.EOF // indicates successful end of stream.
1473 }
1474 return toRPCErr(err)
1475 }
1476
1477 if as.desc.ServerStreams {
1478 // Subsequent messages should be received by subsequent RecvMsg calls.
1479 return nil
1480 }
1481
1482 // Special handling for non-server-stream rpcs.
1483 // This recv expects EOF or errors, so we don't collect inPayload.
1484 if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1485 return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1486 } else if err != nil {
1487 return toRPCErr(err)
1488 }
1489 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1490}
1491
1492func (as *addrConnStream) finish(err error) {
1493 as.mu.Lock()
1494 if as.finished {
1495 as.mu.Unlock()
1496 return
1497 }
1498 as.finished = true
1499 if err == io.EOF {
1500 // Ending a stream with EOF indicates a success.
1501 err = nil
1502 }
1503 if as.transportStream != nil {
1504 as.transportStream.Close(err)
1505 }
1506
1507 if err != nil {
1508 as.ac.incrCallsFailed()
1509 } else {
1510 as.ac.incrCallsSucceeded()
1511 }
1512 as.cancel()
1513 as.mu.Unlock()
1514}
1515
1516// ServerStream defines the server-side behavior of a streaming RPC.
1517//
1518// Errors returned from ServerStream methods are compatible with the status
1519// package. However, the status code will often not match the RPC status as
1520// seen by the client application, and therefore, should not be relied upon for
1521// this purpose.
1522type ServerStream interface {
1523 // SetHeader sets the header metadata. It may be called multiple times.
1524 // When call multiple times, all the provided metadata will be merged.
1525 // All the metadata will be sent out when one of the following happens:
1526 // - ServerStream.SendHeader() is called;
1527 // - The first response is sent out;
1528 // - An RPC status is sent out (error or success).
1529 SetHeader(metadata.MD) error
1530 // SendHeader sends the header metadata.
1531 // The provided md and headers set by SetHeader() will be sent.
1532 // It fails if called multiple times.
1533 SendHeader(metadata.MD) error
1534 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1535 // When called more than once, all the provided metadata will be merged.
1536 SetTrailer(metadata.MD)
1537 // Context returns the context for this stream.
1538 Context() context.Context
1539 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1540 // error is returned directly.
1541 //
1542 // SendMsg blocks until:
1543 // - There is sufficient flow control to schedule m with the transport, or
1544 // - The stream is done, or
1545 // - The stream breaks.
1546 //
1547 // SendMsg does not wait until the message is received by the client. An
1548 // untimely stream closure may result in lost messages.
1549 //
1550 // It is safe to have a goroutine calling SendMsg and another goroutine
1551 // calling RecvMsg on the same stream at the same time, but it is not safe
1552 // to call SendMsg on the same stream in different goroutines.
1553 //
1554 // It is not safe to modify the message after calling SendMsg. Tracing
1555 // libraries and stats handlers may use the message lazily.
1556 SendMsg(m any) error
1557 // RecvMsg blocks until it receives a message into m or the stream is
1558 // done. It returns io.EOF when the client has performed a CloseSend. On
1559 // any non-EOF error, the stream is aborted and the error contains the
1560 // RPC status.
1561 //
1562 // It is safe to have a goroutine calling SendMsg and another goroutine
1563 // calling RecvMsg on the same stream at the same time, but it is not
1564 // safe to call RecvMsg on the same stream in different goroutines.
1565 RecvMsg(m any) error
1566}
1567
1568// serverStream implements a server side Stream.
1569type serverStream struct {
1570 ctx context.Context
1571 s *transport.ServerStream
1572 p *parser
1573 codec baseCodec
1574
1575 compressorV0 Compressor
1576 compressorV1 encoding.Compressor
1577 decompressorV0 Decompressor
1578 decompressorV1 encoding.Compressor
1579
1580 sendCompressorName string
1581
1582 maxReceiveMessageSize int
1583 maxSendMessageSize int
1584 trInfo *traceInfo
1585
1586 statsHandler []stats.Handler
1587
1588 binlogs []binarylog.MethodLogger
1589 // serverHeaderBinlogged indicates whether server header has been logged. It
1590 // will happen when one of the following two happens: stream.SendHeader(),
1591 // stream.Send().
1592 //
1593 // It's only checked in send and sendHeader, doesn't need to be
1594 // synchronized.
1595 serverHeaderBinlogged bool
1596
1597 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1598}
1599
1600func (ss *serverStream) Context() context.Context {
1601 return ss.ctx
1602}
1603
1604func (ss *serverStream) SetHeader(md metadata.MD) error {
1605 if md.Len() == 0 {
1606 return nil
1607 }
1608 err := imetadata.Validate(md)
1609 if err != nil {
1610 return status.Error(codes.Internal, err.Error())
1611 }
1612 return ss.s.SetHeader(md)
1613}
1614
1615func (ss *serverStream) SendHeader(md metadata.MD) error {
1616 err := imetadata.Validate(md)
1617 if err != nil {
1618 return status.Error(codes.Internal, err.Error())
1619 }
1620
1621 err = ss.s.SendHeader(md)
1622 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
1623 h, _ := ss.s.Header()
1624 sh := &binarylog.ServerHeader{
1625 Header: h,
1626 }
1627 ss.serverHeaderBinlogged = true
1628 for _, binlog := range ss.binlogs {
1629 binlog.Log(ss.ctx, sh)
1630 }
1631 }
1632 return err
1633}
1634
1635func (ss *serverStream) SetTrailer(md metadata.MD) {
1636 if md.Len() == 0 {
1637 return
1638 }
1639 if err := imetadata.Validate(md); err != nil {
1640 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1641 }
1642 ss.s.SetTrailer(md)
1643}
1644
1645func (ss *serverStream) SendMsg(m any) (err error) {
1646 defer func() {
1647 if ss.trInfo != nil {
1648 ss.mu.Lock()
1649 if ss.trInfo.tr != nil {
1650 if err == nil {
1651 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1652 } else {
1653 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1654 ss.trInfo.tr.SetError()
1655 }
1656 }
1657 ss.mu.Unlock()
1658 }
1659 if err != nil && err != io.EOF {
1660 st, _ := status.FromError(toRPCErr(err))
1661 ss.s.WriteStatus(st)
1662 // Non-user specified status was sent out. This should be an error
1663 // case (as a server side Cancel maybe).
1664 //
1665 // This is not handled specifically now. User will return a final
1666 // status from the service handler, we will log that error instead.
1667 // This behavior is similar to an interceptor.
1668 }
1669 }()
1670
1671 // Server handler could have set new compressor by calling SetSendCompressor.
1672 // In case it is set, we need to use it for compressing outbound message.
1673 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1674 ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
1675 ss.sendCompressorName = sendCompressorsName
1676 }
1677
1678 // load hdr, payload, data
1679 hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
1680 if err != nil {
1681 return err
1682 }
1683
1684 defer func() {
1685 data.Free()
1686 // only free payload if compression was made, and therefore it is a different set
1687 // of buffers from data.
1688 if pf.isCompressed() {
1689 payload.Free()
1690 }
1691 }()
1692
1693 dataLen := data.Len()
1694 payloadLen := payload.Len()
1695
1696 // TODO(dfawley): should we be checking len(data) instead?
1697 if payloadLen > ss.maxSendMessageSize {
1698 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
1699 }
1700 if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
1701 return toRPCErr(err)
1702 }
1703
1704 if len(ss.binlogs) != 0 {
1705 if !ss.serverHeaderBinlogged {
1706 h, _ := ss.s.Header()
1707 sh := &binarylog.ServerHeader{
1708 Header: h,
1709 }
1710 ss.serverHeaderBinlogged = true
1711 for _, binlog := range ss.binlogs {
1712 binlog.Log(ss.ctx, sh)
1713 }
1714 }
1715 sm := &binarylog.ServerMessage{
1716 Message: data.Materialize(),
1717 }
1718 for _, binlog := range ss.binlogs {
1719 binlog.Log(ss.ctx, sm)
1720 }
1721 }
1722 if len(ss.statsHandler) != 0 {
1723 for _, sh := range ss.statsHandler {
1724 sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
1725 }
1726 }
1727 return nil
1728}
1729
1730func (ss *serverStream) RecvMsg(m any) (err error) {
1731 defer func() {
1732 if ss.trInfo != nil {
1733 ss.mu.Lock()
1734 if ss.trInfo.tr != nil {
1735 if err == nil {
1736 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1737 } else if err != io.EOF {
1738 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1739 ss.trInfo.tr.SetError()
1740 }
1741 }
1742 ss.mu.Unlock()
1743 }
1744 if err != nil && err != io.EOF {
1745 st, _ := status.FromError(toRPCErr(err))
1746 ss.s.WriteStatus(st)
1747 // Non-user specified status was sent out. This should be an error
1748 // case (as a server side Cancel maybe).
1749 //
1750 // This is not handled specifically now. User will return a final
1751 // status from the service handler, we will log that error instead.
1752 // This behavior is similar to an interceptor.
1753 }
1754 }()
1755 var payInfo *payloadInfo
1756 if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
1757 payInfo = &payloadInfo{}
1758 defer payInfo.free()
1759 }
1760 if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
1761 if err == io.EOF {
1762 if len(ss.binlogs) != 0 {
1763 chc := &binarylog.ClientHalfClose{}
1764 for _, binlog := range ss.binlogs {
1765 binlog.Log(ss.ctx, chc)
1766 }
1767 }
1768 return err
1769 }
1770 if err == io.ErrUnexpectedEOF {
1771 err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
1772 }
1773 return toRPCErr(err)
1774 }
1775 if len(ss.statsHandler) != 0 {
1776 for _, sh := range ss.statsHandler {
1777 sh.HandleRPC(ss.s.Context(), &stats.InPayload{
1778 RecvTime: time.Now(),
1779 Payload: m,
1780 Length: payInfo.uncompressedBytes.Len(),
1781 WireLength: payInfo.compressedLength + headerLen,
1782 CompressedLength: payInfo.compressedLength,
1783 })
1784 }
1785 }
1786 if len(ss.binlogs) != 0 {
1787 cm := &binarylog.ClientMessage{
1788 Message: payInfo.uncompressedBytes.Materialize(),
1789 }
1790 for _, binlog := range ss.binlogs {
1791 binlog.Log(ss.ctx, cm)
1792 }
1793 }
1794 return nil
1795}
1796
1797// MethodFromServerStream returns the method string for the input stream.
1798// The returned string is in the format of "/service/method".
1799func MethodFromServerStream(stream ServerStream) (string, bool) {
1800 return Method(stream.Context())
1801}
1802
1803// prepareMsg returns the hdr, payload and data using the compressors passed or
1804// using the passed preparedmsg. The returned boolean indicates whether
1805// compression was made and therefore whether the payload needs to be freed in
1806// addition to the returned data. Freeing the payload if the returned boolean is
1807// false can lead to undefined behavior.
1808func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
1809 if preparedMsg, ok := m.(*PreparedMsg); ok {
1810 return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
1811 }
1812 // The input interface is not a prepared msg.
1813 // Marshal and Compress the data at this point
1814 data, err = encode(codec, m)
1815 if err != nil {
1816 return nil, nil, nil, 0, err
1817 }
1818 compData, pf, err := compress(data, cp, comp, pool)
1819 if err != nil {
1820 data.Free()
1821 return nil, nil, nil, 0, err
1822 }
1823 hdr, payload = msgHeader(data, compData, pf)
1824 return hdr, data, payload, pf, nil
1825}