client_stream.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package transport
 20
 21import (
 22	"sync/atomic"
 23
 24	"golang.org/x/net/http2"
 25	"google.golang.org/grpc/mem"
 26	"google.golang.org/grpc/metadata"
 27	"google.golang.org/grpc/status"
 28)
 29
 30// ClientStream implements streaming functionality for a gRPC client.
 31type ClientStream struct {
 32	*Stream // Embed for common stream functionality.
 33
 34	ct       *http2Client
 35	done     chan struct{} // closed at the end of stream to unblock writers.
 36	doneFunc func()        // invoked at the end of stream.
 37
 38	headerChan       chan struct{} // closed to indicate the end of header metadata.
 39	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
 40	// headerValid indicates whether a valid header was received.  Only
 41	// meaningful after headerChan is closed (always call waitOnHeader() before
 42	// reading its value).
 43	headerValid bool
 44	header      metadata.MD // the received header metadata
 45	noHeaders   bool        // set if the client never received headers (set only after the stream is done).
 46
 47	bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
 48	unprocessed   atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
 49
 50	status *status.Status // the status error received from the server
 51}
 52
 53// Read reads an n byte message from the input stream.
 54func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
 55	b, err := s.Stream.read(n)
 56	if err == nil {
 57		s.ct.incrMsgRecv()
 58	}
 59	return b, err
 60}
 61
 62// Close closes the stream and popagates err to any readers.
 63func (s *ClientStream) Close(err error) {
 64	var (
 65		rst     bool
 66		rstCode http2.ErrCode
 67	)
 68	if err != nil {
 69		rst = true
 70		rstCode = http2.ErrCodeCancel
 71	}
 72	s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
 73}
 74
 75// Write writes the hdr and data bytes to the output stream.
 76func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
 77	return s.ct.write(s, hdr, data, opts)
 78}
 79
 80// BytesReceived indicates whether any bytes have been received on this stream.
 81func (s *ClientStream) BytesReceived() bool {
 82	return s.bytesReceived.Load()
 83}
 84
 85// Unprocessed indicates whether the server did not process this stream --
 86// i.e. it sent a refused stream or GOAWAY including this stream ID.
 87func (s *ClientStream) Unprocessed() bool {
 88	return s.unprocessed.Load()
 89}
 90
 91func (s *ClientStream) waitOnHeader() {
 92	select {
 93	case <-s.ctx.Done():
 94		// Close the stream to prevent headers/trailers from changing after
 95		// this function returns.
 96		s.Close(ContextErr(s.ctx.Err()))
 97		// headerChan could possibly not be closed yet if closeStream raced
 98		// with operateHeaders; wait until it is closed explicitly here.
 99		<-s.headerChan
100	case <-s.headerChan:
101	}
102}
103
104// RecvCompress returns the compression algorithm applied to the inbound
105// message. It is empty string if there is no compression applied.
106func (s *ClientStream) RecvCompress() string {
107	s.waitOnHeader()
108	return s.recvCompress
109}
110
111// Done returns a channel which is closed when it receives the final status
112// from the server.
113func (s *ClientStream) Done() <-chan struct{} {
114	return s.done
115}
116
117// Header returns the header metadata of the stream. Acquires the key-value
118// pairs of header metadata once it is available. It blocks until i) the
119// metadata is ready or ii) there is no header metadata or iii) the stream is
120// canceled/expired.
121func (s *ClientStream) Header() (metadata.MD, error) {
122	s.waitOnHeader()
123
124	if !s.headerValid || s.noHeaders {
125		return nil, s.status.Err()
126	}
127
128	return s.header.Copy(), nil
129}
130
131// TrailersOnly blocks until a header or trailers-only frame is received and
132// then returns true if the stream was trailers-only.  If the stream ends
133// before headers are received, returns true, nil.
134func (s *ClientStream) TrailersOnly() bool {
135	s.waitOnHeader()
136	return s.noHeaders
137}
138
139// Status returns the status received from the server.
140// Status can be read safely only after the stream has ended,
141// that is, after Done() is closed.
142func (s *ClientStream) Status() *status.Status {
143	return s.status
144}