1/*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "sync/atomic"
23
24 "golang.org/x/net/http2"
25 "google.golang.org/grpc/mem"
26 "google.golang.org/grpc/metadata"
27 "google.golang.org/grpc/status"
28)
29
30// ClientStream implements streaming functionality for a gRPC client.
31type ClientStream struct {
32 *Stream // Embed for common stream functionality.
33
34 ct *http2Client
35 done chan struct{} // closed at the end of stream to unblock writers.
36 doneFunc func() // invoked at the end of stream.
37
38 headerChan chan struct{} // closed to indicate the end of header metadata.
39 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
40 // headerValid indicates whether a valid header was received. Only
41 // meaningful after headerChan is closed (always call waitOnHeader() before
42 // reading its value).
43 headerValid bool
44 header metadata.MD // the received header metadata
45 noHeaders bool // set if the client never received headers (set only after the stream is done).
46
47 bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
48 unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
49
50 status *status.Status // the status error received from the server
51}
52
53// Read reads an n byte message from the input stream.
54func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
55 b, err := s.Stream.read(n)
56 if err == nil {
57 s.ct.incrMsgRecv()
58 }
59 return b, err
60}
61
62// Close closes the stream and popagates err to any readers.
63func (s *ClientStream) Close(err error) {
64 var (
65 rst bool
66 rstCode http2.ErrCode
67 )
68 if err != nil {
69 rst = true
70 rstCode = http2.ErrCodeCancel
71 }
72 s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
73}
74
75// Write writes the hdr and data bytes to the output stream.
76func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
77 return s.ct.write(s, hdr, data, opts)
78}
79
80// BytesReceived indicates whether any bytes have been received on this stream.
81func (s *ClientStream) BytesReceived() bool {
82 return s.bytesReceived.Load()
83}
84
85// Unprocessed indicates whether the server did not process this stream --
86// i.e. it sent a refused stream or GOAWAY including this stream ID.
87func (s *ClientStream) Unprocessed() bool {
88 return s.unprocessed.Load()
89}
90
91func (s *ClientStream) waitOnHeader() {
92 select {
93 case <-s.ctx.Done():
94 // Close the stream to prevent headers/trailers from changing after
95 // this function returns.
96 s.Close(ContextErr(s.ctx.Err()))
97 // headerChan could possibly not be closed yet if closeStream raced
98 // with operateHeaders; wait until it is closed explicitly here.
99 <-s.headerChan
100 case <-s.headerChan:
101 }
102}
103
104// RecvCompress returns the compression algorithm applied to the inbound
105// message. It is empty string if there is no compression applied.
106func (s *ClientStream) RecvCompress() string {
107 s.waitOnHeader()
108 return s.recvCompress
109}
110
111// Done returns a channel which is closed when it receives the final status
112// from the server.
113func (s *ClientStream) Done() <-chan struct{} {
114 return s.done
115}
116
117// Header returns the header metadata of the stream. Acquires the key-value
118// pairs of header metadata once it is available. It blocks until i) the
119// metadata is ready or ii) there is no header metadata or iii) the stream is
120// canceled/expired.
121func (s *ClientStream) Header() (metadata.MD, error) {
122 s.waitOnHeader()
123
124 if !s.headerValid || s.noHeaders {
125 return nil, s.status.Err()
126 }
127
128 return s.header.Copy(), nil
129}
130
131// TrailersOnly blocks until a header or trailers-only frame is received and
132// then returns true if the stream was trailers-only. If the stream ends
133// before headers are received, returns true, nil.
134func (s *ClientStream) TrailersOnly() bool {
135 s.waitOnHeader()
136 return s.noHeaders
137}
138
139// Status returns the status received from the server.
140// Status can be read safely only after the stream has ended,
141// that is, after Done() is closed.
142func (s *ClientStream) Status() *status.Status {
143 return s.status
144}