1/*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "errors"
24 "strings"
25 "sync"
26 "sync/atomic"
27
28 "google.golang.org/grpc/mem"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31)
32
33// ServerStream implements streaming functionality for a gRPC server.
34type ServerStream struct {
35 *Stream // Embed for common stream functionality.
36
37 st internalServerTransport
38 ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
39 cancel context.CancelFunc // invoked at the end of stream to cancel ctx.
40
41 // Holds compressor names passed in grpc-accept-encoding metadata from the
42 // client.
43 clientAdvertisedCompressors string
44 headerWireLength int
45
46 // hdrMu protects outgoing header and trailer metadata.
47 hdrMu sync.Mutex
48 header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
49 headerSent atomic.Bool // atomically set when the headers are sent out.
50}
51
52// Read reads an n byte message from the input stream.
53func (s *ServerStream) Read(n int) (mem.BufferSlice, error) {
54 b, err := s.Stream.read(n)
55 if err == nil {
56 s.st.incrMsgRecv()
57 }
58 return b, err
59}
60
61// SendHeader sends the header metadata for the given stream.
62func (s *ServerStream) SendHeader(md metadata.MD) error {
63 return s.st.writeHeader(s, md)
64}
65
66// Write writes the hdr and data bytes to the output stream.
67func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
68 return s.st.write(s, hdr, data, opts)
69}
70
71// WriteStatus sends the status of a stream to the client. WriteStatus is
72// the final call made on a stream and always occurs.
73func (s *ServerStream) WriteStatus(st *status.Status) error {
74 return s.st.writeStatus(s, st)
75}
76
77// isHeaderSent indicates whether headers have been sent.
78func (s *ServerStream) isHeaderSent() bool {
79 return s.headerSent.Load()
80}
81
82// updateHeaderSent updates headerSent and returns true
83// if it was already set.
84func (s *ServerStream) updateHeaderSent() bool {
85 return s.headerSent.Swap(true)
86}
87
88// RecvCompress returns the compression algorithm applied to the inbound
89// message. It is empty string if there is no compression applied.
90func (s *ServerStream) RecvCompress() string {
91 return s.recvCompress
92}
93
94// SendCompress returns the send compressor name.
95func (s *ServerStream) SendCompress() string {
96 return s.sendCompress
97}
98
99// ContentSubtype returns the content-subtype for a request. For example, a
100// content-subtype of "proto" will result in a content-type of
101// "application/grpc+proto". This will always be lowercase. See
102// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
103// more details.
104func (s *ServerStream) ContentSubtype() string {
105 return s.contentSubtype
106}
107
108// SetSendCompress sets the compression algorithm to the stream.
109func (s *ServerStream) SetSendCompress(name string) error {
110 if s.isHeaderSent() || s.getState() == streamDone {
111 return errors.New("transport: set send compressor called after headers sent or stream done")
112 }
113
114 s.sendCompress = name
115 return nil
116}
117
118// SetContext sets the context of the stream. This will be deleted once the
119// stats handler callouts all move to gRPC layer.
120func (s *ServerStream) SetContext(ctx context.Context) {
121 s.ctx = ctx
122}
123
124// ClientAdvertisedCompressors returns the compressor names advertised by the
125// client via grpc-accept-encoding header.
126func (s *ServerStream) ClientAdvertisedCompressors() []string {
127 values := strings.Split(s.clientAdvertisedCompressors, ",")
128 for i, v := range values {
129 values[i] = strings.TrimSpace(v)
130 }
131 return values
132}
133
134// Header returns the header metadata of the stream. It returns the out header
135// after t.WriteHeader is called. It does not block and must not be called
136// until after WriteHeader.
137func (s *ServerStream) Header() (metadata.MD, error) {
138 // Return the header in stream. It will be the out
139 // header after t.WriteHeader is called.
140 return s.header.Copy(), nil
141}
142
143// HeaderWireLength returns the size of the headers of the stream as received
144// from the wire.
145func (s *ServerStream) HeaderWireLength() int {
146 return s.headerWireLength
147}
148
149// SetHeader sets the header metadata. This can be called multiple times.
150// This should not be called in parallel to other data writes.
151func (s *ServerStream) SetHeader(md metadata.MD) error {
152 if md.Len() == 0 {
153 return nil
154 }
155 if s.isHeaderSent() || s.getState() == streamDone {
156 return ErrIllegalHeaderWrite
157 }
158 s.hdrMu.Lock()
159 s.header = metadata.Join(s.header, md)
160 s.hdrMu.Unlock()
161 return nil
162}
163
164// SetTrailer sets the trailer metadata which will be sent with the RPC status
165// by the server. This can be called multiple times.
166// This should not be called parallel to other data writes.
167func (s *ServerStream) SetTrailer(md metadata.MD) error {
168 if md.Len() == 0 {
169 return nil
170 }
171 if s.getState() == streamDone {
172 return ErrIllegalHeaderWrite
173 }
174 s.hdrMu.Lock()
175 s.trailer = metadata.Join(s.trailer, md)
176 s.hdrMu.Unlock()
177 return nil
178}