1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "fmt"
25 "net"
26 "runtime"
27 "strconv"
28 "sync"
29 "sync/atomic"
30
31 "golang.org/x/net/http2"
32 "golang.org/x/net/http2/hpack"
33 "google.golang.org/grpc/internal/grpclog"
34 "google.golang.org/grpc/internal/grpcutil"
35 "google.golang.org/grpc/mem"
36 "google.golang.org/grpc/status"
37)
38
39var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
40 e.SetMaxDynamicTableSizeLimit(v)
41}
42
43type itemNode struct {
44 it any
45 next *itemNode
46}
47
48type itemList struct {
49 head *itemNode
50 tail *itemNode
51}
52
53func (il *itemList) enqueue(i any) {
54 n := &itemNode{it: i}
55 if il.tail == nil {
56 il.head, il.tail = n, n
57 return
58 }
59 il.tail.next = n
60 il.tail = n
61}
62
63// peek returns the first item in the list without removing it from the
64// list.
65func (il *itemList) peek() any {
66 return il.head.it
67}
68
69func (il *itemList) dequeue() any {
70 if il.head == nil {
71 return nil
72 }
73 i := il.head.it
74 il.head = il.head.next
75 if il.head == nil {
76 il.tail = nil
77 }
78 return i
79}
80
81func (il *itemList) dequeueAll() *itemNode {
82 h := il.head
83 il.head, il.tail = nil, nil
84 return h
85}
86
87func (il *itemList) isEmpty() bool {
88 return il.head == nil
89}
90
91// The following defines various control items which could flow through
92// the control buffer of transport. They represent different aspects of
93// control tasks, e.g., flow control, settings, streaming resetting, etc.
94
95// maxQueuedTransportResponseFrames is the most queued "transport response"
96// frames we will buffer before preventing new reads from occurring on the
97// transport. These are control frames sent in response to client requests,
98// such as RST_STREAM due to bad headers or settings acks.
99const maxQueuedTransportResponseFrames = 50
100
101type cbItem interface {
102 isTransportResponseFrame() bool
103}
104
105// registerStream is used to register an incoming stream with loopy writer.
106type registerStream struct {
107 streamID uint32
108 wq *writeQuota
109}
110
111func (*registerStream) isTransportResponseFrame() bool { return false }
112
113// headerFrame is also used to register stream on the client-side.
114type headerFrame struct {
115 streamID uint32
116 hf []hpack.HeaderField
117 endStream bool // Valid on server side.
118 initStream func(uint32) error // Used only on the client side.
119 onWrite func()
120 wq *writeQuota // write quota for the stream created.
121 cleanup *cleanupStream // Valid on the server side.
122 onOrphaned func(error) // Valid on client-side
123}
124
125func (h *headerFrame) isTransportResponseFrame() bool {
126 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
127}
128
129type cleanupStream struct {
130 streamID uint32
131 rst bool
132 rstCode http2.ErrCode
133 onWrite func()
134}
135
136func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
137
138type earlyAbortStream struct {
139 httpStatus uint32
140 streamID uint32
141 contentSubtype string
142 status *status.Status
143 rst bool
144}
145
146func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
147
148type dataFrame struct {
149 streamID uint32
150 endStream bool
151 h []byte
152 reader mem.Reader
153 // onEachWrite is called every time
154 // a part of data is written out.
155 onEachWrite func()
156}
157
158func (*dataFrame) isTransportResponseFrame() bool { return false }
159
160type incomingWindowUpdate struct {
161 streamID uint32
162 increment uint32
163}
164
165func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
166
167type outgoingWindowUpdate struct {
168 streamID uint32
169 increment uint32
170}
171
172func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
173 return false // window updates are throttled by thresholds
174}
175
176type incomingSettings struct {
177 ss []http2.Setting
178}
179
180func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
181
182type outgoingSettings struct {
183 ss []http2.Setting
184}
185
186func (*outgoingSettings) isTransportResponseFrame() bool { return false }
187
188type incomingGoAway struct {
189}
190
191func (*incomingGoAway) isTransportResponseFrame() bool { return false }
192
193type goAway struct {
194 code http2.ErrCode
195 debugData []byte
196 headsUp bool
197 closeConn error // if set, loopyWriter will exit with this error
198}
199
200func (*goAway) isTransportResponseFrame() bool { return false }
201
202type ping struct {
203 ack bool
204 data [8]byte
205}
206
207func (*ping) isTransportResponseFrame() bool { return true }
208
209type outFlowControlSizeRequest struct {
210 resp chan uint32
211}
212
213func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
214
215// closeConnection is an instruction to tell the loopy writer to flush the
216// framer and exit, which will cause the transport's connection to be closed
217// (by the client or server). The transport itself will close after the reader
218// encounters the EOF caused by the connection closure.
219type closeConnection struct{}
220
221func (closeConnection) isTransportResponseFrame() bool { return false }
222
223type outStreamState int
224
225const (
226 active outStreamState = iota
227 empty
228 waitingOnStreamQuota
229)
230
231type outStream struct {
232 id uint32
233 state outStreamState
234 itl *itemList
235 bytesOutStanding int
236 wq *writeQuota
237
238 next *outStream
239 prev *outStream
240}
241
242func (s *outStream) deleteSelf() {
243 if s.prev != nil {
244 s.prev.next = s.next
245 }
246 if s.next != nil {
247 s.next.prev = s.prev
248 }
249 s.next, s.prev = nil, nil
250}
251
252type outStreamList struct {
253 // Following are sentinel objects that mark the
254 // beginning and end of the list. They do not
255 // contain any item lists. All valid objects are
256 // inserted in between them.
257 // This is needed so that an outStream object can
258 // deleteSelf() in O(1) time without knowing which
259 // list it belongs to.
260 head *outStream
261 tail *outStream
262}
263
264func newOutStreamList() *outStreamList {
265 head, tail := new(outStream), new(outStream)
266 head.next = tail
267 tail.prev = head
268 return &outStreamList{
269 head: head,
270 tail: tail,
271 }
272}
273
274func (l *outStreamList) enqueue(s *outStream) {
275 e := l.tail.prev
276 e.next = s
277 s.prev = e
278 s.next = l.tail
279 l.tail.prev = s
280}
281
282// remove from the beginning of the list.
283func (l *outStreamList) dequeue() *outStream {
284 b := l.head.next
285 if b == l.tail {
286 return nil
287 }
288 b.deleteSelf()
289 return b
290}
291
292// controlBuffer is a way to pass information to loopy.
293//
294// Information is passed as specific struct types called control frames. A
295// control frame not only represents data, messages or headers to be sent out
296// but can also be used to instruct loopy to update its internal state. It
297// shouldn't be confused with an HTTP2 frame, although some of the control
298// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
299type controlBuffer struct {
300 wakeupCh chan struct{} // Unblocks readers waiting for something to read.
301 done <-chan struct{} // Closed when the transport is done.
302
303 // Mutex guards all the fields below, except trfChan which can be read
304 // atomically without holding mu.
305 mu sync.Mutex
306 consumerWaiting bool // True when readers are blocked waiting for new data.
307 closed bool // True when the controlbuf is finished.
308 list *itemList // List of queued control frames.
309
310 // transportResponseFrames counts the number of queued items that represent
311 // the response of an action initiated by the peer. trfChan is created
312 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
313 // closed and nilled when transportResponseFrames drops below the
314 // threshold. Both fields are protected by mu.
315 transportResponseFrames int
316 trfChan atomic.Pointer[chan struct{}]
317}
318
319func newControlBuffer(done <-chan struct{}) *controlBuffer {
320 return &controlBuffer{
321 wakeupCh: make(chan struct{}, 1),
322 list: &itemList{},
323 done: done,
324 }
325}
326
327// throttle blocks if there are too many frames in the control buf that
328// represent the response of an action initiated by the peer, like
329// incomingSettings cleanupStreams etc.
330func (c *controlBuffer) throttle() {
331 if ch := c.trfChan.Load(); ch != nil {
332 select {
333 case <-(*ch):
334 case <-c.done:
335 }
336 }
337}
338
339// put adds an item to the controlbuf.
340func (c *controlBuffer) put(it cbItem) error {
341 _, err := c.executeAndPut(nil, it)
342 return err
343}
344
345// executeAndPut runs f, and if the return value is true, adds the given item to
346// the controlbuf. The item could be nil, in which case, this method simply
347// executes f and does not add the item to the controlbuf.
348//
349// The first return value indicates whether the item was successfully added to
350// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
351// if the control buffer is already closed.
352func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
353 c.mu.Lock()
354 defer c.mu.Unlock()
355
356 if c.closed {
357 return false, ErrConnClosing
358 }
359 if f != nil {
360 if !f() { // f wasn't successful
361 return false, nil
362 }
363 }
364 if it == nil {
365 return true, nil
366 }
367
368 var wakeUp bool
369 if c.consumerWaiting {
370 wakeUp = true
371 c.consumerWaiting = false
372 }
373 c.list.enqueue(it)
374 if it.isTransportResponseFrame() {
375 c.transportResponseFrames++
376 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
377 // We are adding the frame that puts us over the threshold; create
378 // a throttling channel.
379 ch := make(chan struct{})
380 c.trfChan.Store(&ch)
381 }
382 }
383 if wakeUp {
384 select {
385 case c.wakeupCh <- struct{}{}:
386 default:
387 }
388 }
389 return true, nil
390}
391
392// get returns the next control frame from the control buffer. If block is true
393// **and** there are no control frames in the control buffer, the call blocks
394// until one of the conditions is met: there is a frame to return or the
395// transport is closed.
396func (c *controlBuffer) get(block bool) (any, error) {
397 for {
398 c.mu.Lock()
399 frame, err := c.getOnceLocked()
400 if frame != nil || err != nil || !block {
401 // If we read a frame or an error, we can return to the caller. The
402 // call to getOnceLocked() returns a nil frame and a nil error if
403 // there is nothing to read, and in that case, if the caller asked
404 // us not to block, we can return now as well.
405 c.mu.Unlock()
406 return frame, err
407 }
408 c.consumerWaiting = true
409 c.mu.Unlock()
410
411 // Release the lock above and wait to be woken up.
412 select {
413 case <-c.wakeupCh:
414 case <-c.done:
415 return nil, errors.New("transport closed by client")
416 }
417 }
418}
419
420// Callers must not use this method, but should instead use get().
421//
422// Caller must hold c.mu.
423func (c *controlBuffer) getOnceLocked() (any, error) {
424 if c.closed {
425 return false, ErrConnClosing
426 }
427 if c.list.isEmpty() {
428 return nil, nil
429 }
430 h := c.list.dequeue().(cbItem)
431 if h.isTransportResponseFrame() {
432 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
433 // We are removing the frame that put us over the
434 // threshold; close and clear the throttling channel.
435 ch := c.trfChan.Swap(nil)
436 close(*ch)
437 }
438 c.transportResponseFrames--
439 }
440 return h, nil
441}
442
443// finish closes the control buffer, cleaning up any streams that have queued
444// header frames. Once this method returns, no more frames can be added to the
445// control buffer, and attempts to do so will return ErrConnClosing.
446func (c *controlBuffer) finish() {
447 c.mu.Lock()
448 defer c.mu.Unlock()
449
450 if c.closed {
451 return
452 }
453 c.closed = true
454 // There may be headers for streams in the control buffer.
455 // These streams need to be cleaned out since the transport
456 // is still not aware of these yet.
457 for head := c.list.dequeueAll(); head != nil; head = head.next {
458 switch v := head.it.(type) {
459 case *headerFrame:
460 if v.onOrphaned != nil { // It will be nil on the server-side.
461 v.onOrphaned(ErrConnClosing)
462 }
463 case *dataFrame:
464 _ = v.reader.Close()
465 }
466 }
467
468 // In case throttle() is currently in flight, it needs to be unblocked.
469 // Otherwise, the transport may not close, since the transport is closed by
470 // the reader encountering the connection error.
471 ch := c.trfChan.Swap(nil)
472 if ch != nil {
473 close(*ch)
474 }
475}
476
477type side int
478
479const (
480 clientSide side = iota
481 serverSide
482)
483
484// Loopy receives frames from the control buffer.
485// Each frame is handled individually; most of the work done by loopy goes
486// into handling data frames. Loopy maintains a queue of active streams, and each
487// stream maintains a queue of data frames; as loopy receives data frames
488// it gets added to the queue of the relevant stream.
489// Loopy goes over this list of active streams by processing one node every iteration,
490// thereby closely resembling a round-robin scheduling over all streams. While
491// processing a stream, loopy writes out data bytes from this stream capped by the min
492// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
493type loopyWriter struct {
494 side side
495 cbuf *controlBuffer
496 sendQuota uint32
497 oiws uint32 // outbound initial window size.
498 // estdStreams is map of all established streams that are not cleaned-up yet.
499 // On client-side, this is all streams whose headers were sent out.
500 // On server-side, this is all streams whose headers were received.
501 estdStreams map[uint32]*outStream // Established streams.
502 // activeStreams is a linked-list of all streams that have data to send and some
503 // stream-level flow control quota.
504 // Each of these streams internally have a list of data items(and perhaps trailers
505 // on the server-side) to be sent out.
506 activeStreams *outStreamList
507 framer *framer
508 hBuf *bytes.Buffer // The buffer for HPACK encoding.
509 hEnc *hpack.Encoder // HPACK encoder.
510 bdpEst *bdpEstimator
511 draining bool
512 conn net.Conn
513 logger *grpclog.PrefixLogger
514 bufferPool mem.BufferPool
515
516 // Side-specific handlers
517 ssGoAwayHandler func(*goAway) (bool, error)
518}
519
520func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
521 var buf bytes.Buffer
522 l := &loopyWriter{
523 side: s,
524 cbuf: cbuf,
525 sendQuota: defaultWindowSize,
526 oiws: defaultWindowSize,
527 estdStreams: make(map[uint32]*outStream),
528 activeStreams: newOutStreamList(),
529 framer: fr,
530 hBuf: &buf,
531 hEnc: hpack.NewEncoder(&buf),
532 bdpEst: bdpEst,
533 conn: conn,
534 logger: logger,
535 ssGoAwayHandler: goAwayHandler,
536 bufferPool: bufferPool,
537 }
538 return l
539}
540
541const minBatchSize = 1000
542
543// run should be run in a separate goroutine.
544// It reads control frames from controlBuf and processes them by:
545// 1. Updating loopy's internal state, or/and
546// 2. Writing out HTTP2 frames on the wire.
547//
548// Loopy keeps all active streams with data to send in a linked-list.
549// All streams in the activeStreams linked-list must have both:
550// 1. Data to send, and
551// 2. Stream level flow control quota available.
552//
553// In each iteration of run loop, other than processing the incoming control
554// frame, loopy calls processData, which processes one node from the
555// activeStreams linked-list. This results in writing of HTTP2 frames into an
556// underlying write buffer. When there's no more control frames to read from
557// controlBuf, loopy flushes the write buffer. As an optimization, to increase
558// the batch size for each flush, loopy yields the processor, once if the batch
559// size is too low to give stream goroutines a chance to fill it up.
560//
561// Upon exiting, if the error causing the exit is not an I/O error, run()
562// flushes the underlying connection. The connection is always left open to
563// allow different closing behavior on the client and server.
564func (l *loopyWriter) run() (err error) {
565 defer func() {
566 if l.logger.V(logLevel) {
567 l.logger.Infof("loopyWriter exiting with error: %v", err)
568 }
569 if !isIOError(err) {
570 l.framer.writer.Flush()
571 }
572 l.cbuf.finish()
573 }()
574 for {
575 it, err := l.cbuf.get(true)
576 if err != nil {
577 return err
578 }
579 if err = l.handle(it); err != nil {
580 return err
581 }
582 if _, err = l.processData(); err != nil {
583 return err
584 }
585 gosched := true
586 hasdata:
587 for {
588 it, err := l.cbuf.get(false)
589 if err != nil {
590 return err
591 }
592 if it != nil {
593 if err = l.handle(it); err != nil {
594 return err
595 }
596 if _, err = l.processData(); err != nil {
597 return err
598 }
599 continue hasdata
600 }
601 isEmpty, err := l.processData()
602 if err != nil {
603 return err
604 }
605 if !isEmpty {
606 continue hasdata
607 }
608 if gosched {
609 gosched = false
610 if l.framer.writer.offset < minBatchSize {
611 runtime.Gosched()
612 continue hasdata
613 }
614 }
615 l.framer.writer.Flush()
616 break hasdata
617 }
618 }
619}
620
621func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
622 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
623}
624
625func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
626 // Otherwise update the quota.
627 if w.streamID == 0 {
628 l.sendQuota += w.increment
629 return
630 }
631 // Find the stream and update it.
632 if str, ok := l.estdStreams[w.streamID]; ok {
633 str.bytesOutStanding -= int(w.increment)
634 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
635 str.state = active
636 l.activeStreams.enqueue(str)
637 return
638 }
639 }
640}
641
642func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
643 return l.framer.fr.WriteSettings(s.ss...)
644}
645
646func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
647 l.applySettings(s.ss)
648 return l.framer.fr.WriteSettingsAck()
649}
650
651func (l *loopyWriter) registerStreamHandler(h *registerStream) {
652 str := &outStream{
653 id: h.streamID,
654 state: empty,
655 itl: &itemList{},
656 wq: h.wq,
657 }
658 l.estdStreams[h.streamID] = str
659}
660
661func (l *loopyWriter) headerHandler(h *headerFrame) error {
662 if l.side == serverSide {
663 str, ok := l.estdStreams[h.streamID]
664 if !ok {
665 if l.logger.V(logLevel) {
666 l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
667 }
668 return nil
669 }
670 // Case 1.A: Server is responding back with headers.
671 if !h.endStream {
672 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
673 }
674 // else: Case 1.B: Server wants to close stream.
675
676 if str.state != empty { // either active or waiting on stream quota.
677 // add it str's list of items.
678 str.itl.enqueue(h)
679 return nil
680 }
681 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
682 return err
683 }
684 return l.cleanupStreamHandler(h.cleanup)
685 }
686 // Case 2: Client wants to originate stream.
687 str := &outStream{
688 id: h.streamID,
689 state: empty,
690 itl: &itemList{},
691 wq: h.wq,
692 }
693 return l.originateStream(str, h)
694}
695
696func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
697 // l.draining is set when handling GoAway. In which case, we want to avoid
698 // creating new streams.
699 if l.draining {
700 // TODO: provide a better error with the reason we are in draining.
701 hdr.onOrphaned(errStreamDrain)
702 return nil
703 }
704 if err := hdr.initStream(str.id); err != nil {
705 return err
706 }
707 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
708 return err
709 }
710 l.estdStreams[str.id] = str
711 return nil
712}
713
714func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
715 if onWrite != nil {
716 onWrite()
717 }
718 l.hBuf.Reset()
719 for _, f := range hf {
720 if err := l.hEnc.WriteField(f); err != nil {
721 if l.logger.V(logLevel) {
722 l.logger.Warningf("Encountered error while encoding headers: %v", err)
723 }
724 }
725 }
726 var (
727 err error
728 endHeaders, first bool
729 )
730 first = true
731 for !endHeaders {
732 size := l.hBuf.Len()
733 if size > http2MaxFrameLen {
734 size = http2MaxFrameLen
735 } else {
736 endHeaders = true
737 }
738 if first {
739 first = false
740 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
741 StreamID: streamID,
742 BlockFragment: l.hBuf.Next(size),
743 EndStream: endStream,
744 EndHeaders: endHeaders,
745 })
746 } else {
747 err = l.framer.fr.WriteContinuation(
748 streamID,
749 endHeaders,
750 l.hBuf.Next(size),
751 )
752 }
753 if err != nil {
754 return err
755 }
756 }
757 return nil
758}
759
760func (l *loopyWriter) preprocessData(df *dataFrame) {
761 str, ok := l.estdStreams[df.streamID]
762 if !ok {
763 return
764 }
765 // If we got data for a stream it means that
766 // stream was originated and the headers were sent out.
767 str.itl.enqueue(df)
768 if str.state == empty {
769 str.state = active
770 l.activeStreams.enqueue(str)
771 }
772}
773
774func (l *loopyWriter) pingHandler(p *ping) error {
775 if !p.ack {
776 l.bdpEst.timesnap(p.data)
777 }
778 return l.framer.fr.WritePing(p.ack, p.data)
779
780}
781
782func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
783 o.resp <- l.sendQuota
784}
785
786func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
787 c.onWrite()
788 if str, ok := l.estdStreams[c.streamID]; ok {
789 // On the server side it could be a trailers-only response or
790 // a RST_STREAM before stream initialization thus the stream might
791 // not be established yet.
792 delete(l.estdStreams, c.streamID)
793 str.deleteSelf()
794 for head := str.itl.dequeueAll(); head != nil; head = head.next {
795 if df, ok := head.it.(*dataFrame); ok {
796 _ = df.reader.Close()
797 }
798 }
799 }
800 if c.rst { // If RST_STREAM needs to be sent.
801 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
802 return err
803 }
804 }
805 if l.draining && len(l.estdStreams) == 0 {
806 // Flush and close the connection; we are done with it.
807 return errors.New("finished processing active streams while in draining mode")
808 }
809 return nil
810}
811
812func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
813 if l.side == clientSide {
814 return errors.New("earlyAbortStream not handled on client")
815 }
816 // In case the caller forgets to set the http status, default to 200.
817 if eas.httpStatus == 0 {
818 eas.httpStatus = 200
819 }
820 headerFields := []hpack.HeaderField{
821 {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
822 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
823 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
824 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
825 }
826
827 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
828 return err
829 }
830 if eas.rst {
831 if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
832 return err
833 }
834 }
835 return nil
836}
837
838func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
839 if l.side == clientSide {
840 l.draining = true
841 if len(l.estdStreams) == 0 {
842 // Flush and close the connection; we are done with it.
843 return errors.New("received GOAWAY with no active streams")
844 }
845 }
846 return nil
847}
848
849func (l *loopyWriter) goAwayHandler(g *goAway) error {
850 // Handling of outgoing GoAway is very specific to side.
851 if l.ssGoAwayHandler != nil {
852 draining, err := l.ssGoAwayHandler(g)
853 if err != nil {
854 return err
855 }
856 l.draining = draining
857 }
858 return nil
859}
860
861func (l *loopyWriter) handle(i any) error {
862 switch i := i.(type) {
863 case *incomingWindowUpdate:
864 l.incomingWindowUpdateHandler(i)
865 case *outgoingWindowUpdate:
866 return l.outgoingWindowUpdateHandler(i)
867 case *incomingSettings:
868 return l.incomingSettingsHandler(i)
869 case *outgoingSettings:
870 return l.outgoingSettingsHandler(i)
871 case *headerFrame:
872 return l.headerHandler(i)
873 case *registerStream:
874 l.registerStreamHandler(i)
875 case *cleanupStream:
876 return l.cleanupStreamHandler(i)
877 case *earlyAbortStream:
878 return l.earlyAbortStreamHandler(i)
879 case *incomingGoAway:
880 return l.incomingGoAwayHandler(i)
881 case *dataFrame:
882 l.preprocessData(i)
883 case *ping:
884 return l.pingHandler(i)
885 case *goAway:
886 return l.goAwayHandler(i)
887 case *outFlowControlSizeRequest:
888 l.outFlowControlSizeRequestHandler(i)
889 case closeConnection:
890 // Just return a non-I/O error and run() will flush and close the
891 // connection.
892 return ErrConnClosing
893 default:
894 return fmt.Errorf("transport: unknown control message type %T", i)
895 }
896 return nil
897}
898
899func (l *loopyWriter) applySettings(ss []http2.Setting) {
900 for _, s := range ss {
901 switch s.ID {
902 case http2.SettingInitialWindowSize:
903 o := l.oiws
904 l.oiws = s.Val
905 if o < l.oiws {
906 // If the new limit is greater make all depleted streams active.
907 for _, stream := range l.estdStreams {
908 if stream.state == waitingOnStreamQuota {
909 stream.state = active
910 l.activeStreams.enqueue(stream)
911 }
912 }
913 }
914 case http2.SettingHeaderTableSize:
915 updateHeaderTblSize(l.hEnc, s.Val)
916 }
917 }
918}
919
920// processData removes the first stream from active streams, writes out at most 16KB
921// of its data and then puts it at the end of activeStreams if there's still more data
922// to be sent and stream has some stream-level flow control.
923func (l *loopyWriter) processData() (bool, error) {
924 if l.sendQuota == 0 {
925 return true, nil
926 }
927 str := l.activeStreams.dequeue() // Remove the first stream.
928 if str == nil {
929 return true, nil
930 }
931 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
932 // A data item is represented by a dataFrame, since it later translates into
933 // multiple HTTP2 data frames.
934 // Every dataFrame has two buffers; h that keeps grpc-message header and data
935 // that is the actual message. As an optimization to keep wire traffic low, data
936 // from data is copied to h to make as big as the maximum possible HTTP2 frame
937 // size.
938
939 if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
940 // Client sends out empty data frame with endStream = true
941 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
942 return false, err
943 }
944 str.itl.dequeue() // remove the empty data item from stream
945 _ = dataItem.reader.Close()
946 if str.itl.isEmpty() {
947 str.state = empty
948 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
949 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
950 return false, err
951 }
952 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
953 return false, err
954 }
955 } else {
956 l.activeStreams.enqueue(str)
957 }
958 return false, nil
959 }
960
961 // Figure out the maximum size we can send
962 maxSize := http2MaxFrameLen
963 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
964 str.state = waitingOnStreamQuota
965 return false, nil
966 } else if maxSize > strQuota {
967 maxSize = strQuota
968 }
969 if maxSize > int(l.sendQuota) { // connection-level flow control.
970 maxSize = int(l.sendQuota)
971 }
972 // Compute how much of the header and data we can send within quota and max frame length
973 hSize := min(maxSize, len(dataItem.h))
974 dSize := min(maxSize-hSize, dataItem.reader.Remaining())
975 remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
976 size := hSize + dSize
977
978 var buf *[]byte
979
980 if hSize != 0 && dSize == 0 {
981 buf = &dataItem.h
982 } else {
983 // Note: this is only necessary because the http2.Framer does not support
984 // partially writing a frame, so the sequence must be materialized into a buffer.
985 // TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
986 pool := l.bufferPool
987 if pool == nil {
988 // Note that this is only supposed to be nil in tests. Otherwise, stream is
989 // always initialized with a BufferPool.
990 pool = mem.DefaultBufferPool()
991 }
992 buf = pool.Get(size)
993 defer pool.Put(buf)
994
995 copy((*buf)[:hSize], dataItem.h)
996 _, _ = dataItem.reader.Read((*buf)[hSize:])
997 }
998
999 // Now that outgoing flow controls are checked we can replenish str's write quota
1000 str.wq.replenish(size)
1001 var endStream bool
1002 // If this is the last data message on this stream and all of it can be written in this iteration.
1003 if dataItem.endStream && remainingBytes == 0 {
1004 endStream = true
1005 }
1006 if dataItem.onEachWrite != nil {
1007 dataItem.onEachWrite()
1008 }
1009 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
1010 return false, err
1011 }
1012 str.bytesOutStanding += size
1013 l.sendQuota -= uint32(size)
1014 dataItem.h = dataItem.h[hSize:]
1015
1016 if remainingBytes == 0 { // All the data from that message was written out.
1017 _ = dataItem.reader.Close()
1018 str.itl.dequeue()
1019 }
1020 if str.itl.isEmpty() {
1021 str.state = empty
1022 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
1023 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
1024 return false, err
1025 }
1026 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
1027 return false, err
1028 }
1029 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
1030 str.state = waitingOnStreamQuota
1031 } else { // Otherwise add it back to the list of active streams.
1032 l.activeStreams.enqueue(str)
1033 }
1034 return false, nil
1035}