controlbuf.go

   1/*
   2 *
   3 * Copyright 2014 gRPC authors.
   4 *
   5 * Licensed under the Apache License, Version 2.0 (the "License");
   6 * you may not use this file except in compliance with the License.
   7 * You may obtain a copy of the License at
   8 *
   9 *     http://www.apache.org/licenses/LICENSE-2.0
  10 *
  11 * Unless required by applicable law or agreed to in writing, software
  12 * distributed under the License is distributed on an "AS IS" BASIS,
  13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 * See the License for the specific language governing permissions and
  15 * limitations under the License.
  16 *
  17 */
  18
  19package transport
  20
  21import (
  22	"bytes"
  23	"errors"
  24	"fmt"
  25	"net"
  26	"runtime"
  27	"strconv"
  28	"sync"
  29	"sync/atomic"
  30
  31	"golang.org/x/net/http2"
  32	"golang.org/x/net/http2/hpack"
  33	"google.golang.org/grpc/internal/grpclog"
  34	"google.golang.org/grpc/internal/grpcutil"
  35	"google.golang.org/grpc/mem"
  36	"google.golang.org/grpc/status"
  37)
  38
  39var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  40	e.SetMaxDynamicTableSizeLimit(v)
  41}
  42
  43type itemNode struct {
  44	it   any
  45	next *itemNode
  46}
  47
  48type itemList struct {
  49	head *itemNode
  50	tail *itemNode
  51}
  52
  53func (il *itemList) enqueue(i any) {
  54	n := &itemNode{it: i}
  55	if il.tail == nil {
  56		il.head, il.tail = n, n
  57		return
  58	}
  59	il.tail.next = n
  60	il.tail = n
  61}
  62
  63// peek returns the first item in the list without removing it from the
  64// list.
  65func (il *itemList) peek() any {
  66	return il.head.it
  67}
  68
  69func (il *itemList) dequeue() any {
  70	if il.head == nil {
  71		return nil
  72	}
  73	i := il.head.it
  74	il.head = il.head.next
  75	if il.head == nil {
  76		il.tail = nil
  77	}
  78	return i
  79}
  80
  81func (il *itemList) dequeueAll() *itemNode {
  82	h := il.head
  83	il.head, il.tail = nil, nil
  84	return h
  85}
  86
  87func (il *itemList) isEmpty() bool {
  88	return il.head == nil
  89}
  90
  91// The following defines various control items which could flow through
  92// the control buffer of transport. They represent different aspects of
  93// control tasks, e.g., flow control, settings, streaming resetting, etc.
  94
  95// maxQueuedTransportResponseFrames is the most queued "transport response"
  96// frames we will buffer before preventing new reads from occurring on the
  97// transport.  These are control frames sent in response to client requests,
  98// such as RST_STREAM due to bad headers or settings acks.
  99const maxQueuedTransportResponseFrames = 50
 100
 101type cbItem interface {
 102	isTransportResponseFrame() bool
 103}
 104
 105// registerStream is used to register an incoming stream with loopy writer.
 106type registerStream struct {
 107	streamID uint32
 108	wq       *writeQuota
 109}
 110
 111func (*registerStream) isTransportResponseFrame() bool { return false }
 112
 113// headerFrame is also used to register stream on the client-side.
 114type headerFrame struct {
 115	streamID   uint32
 116	hf         []hpack.HeaderField
 117	endStream  bool               // Valid on server side.
 118	initStream func(uint32) error // Used only on the client side.
 119	onWrite    func()
 120	wq         *writeQuota    // write quota for the stream created.
 121	cleanup    *cleanupStream // Valid on the server side.
 122	onOrphaned func(error)    // Valid on client-side
 123}
 124
 125func (h *headerFrame) isTransportResponseFrame() bool {
 126	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
 127}
 128
 129type cleanupStream struct {
 130	streamID uint32
 131	rst      bool
 132	rstCode  http2.ErrCode
 133	onWrite  func()
 134}
 135
 136func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
 137
 138type earlyAbortStream struct {
 139	httpStatus     uint32
 140	streamID       uint32
 141	contentSubtype string
 142	status         *status.Status
 143	rst            bool
 144}
 145
 146func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
 147
 148type dataFrame struct {
 149	streamID  uint32
 150	endStream bool
 151	h         []byte
 152	reader    mem.Reader
 153	// onEachWrite is called every time
 154	// a part of data is written out.
 155	onEachWrite func()
 156}
 157
 158func (*dataFrame) isTransportResponseFrame() bool { return false }
 159
 160type incomingWindowUpdate struct {
 161	streamID  uint32
 162	increment uint32
 163}
 164
 165func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
 166
 167type outgoingWindowUpdate struct {
 168	streamID  uint32
 169	increment uint32
 170}
 171
 172func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
 173	return false // window updates are throttled by thresholds
 174}
 175
 176type incomingSettings struct {
 177	ss []http2.Setting
 178}
 179
 180func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
 181
 182type outgoingSettings struct {
 183	ss []http2.Setting
 184}
 185
 186func (*outgoingSettings) isTransportResponseFrame() bool { return false }
 187
 188type incomingGoAway struct {
 189}
 190
 191func (*incomingGoAway) isTransportResponseFrame() bool { return false }
 192
 193type goAway struct {
 194	code      http2.ErrCode
 195	debugData []byte
 196	headsUp   bool
 197	closeConn error // if set, loopyWriter will exit with this error
 198}
 199
 200func (*goAway) isTransportResponseFrame() bool { return false }
 201
 202type ping struct {
 203	ack  bool
 204	data [8]byte
 205}
 206
 207func (*ping) isTransportResponseFrame() bool { return true }
 208
 209type outFlowControlSizeRequest struct {
 210	resp chan uint32
 211}
 212
 213func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
 214
 215// closeConnection is an instruction to tell the loopy writer to flush the
 216// framer and exit, which will cause the transport's connection to be closed
 217// (by the client or server).  The transport itself will close after the reader
 218// encounters the EOF caused by the connection closure.
 219type closeConnection struct{}
 220
 221func (closeConnection) isTransportResponseFrame() bool { return false }
 222
 223type outStreamState int
 224
 225const (
 226	active outStreamState = iota
 227	empty
 228	waitingOnStreamQuota
 229)
 230
 231type outStream struct {
 232	id               uint32
 233	state            outStreamState
 234	itl              *itemList
 235	bytesOutStanding int
 236	wq               *writeQuota
 237
 238	next *outStream
 239	prev *outStream
 240}
 241
 242func (s *outStream) deleteSelf() {
 243	if s.prev != nil {
 244		s.prev.next = s.next
 245	}
 246	if s.next != nil {
 247		s.next.prev = s.prev
 248	}
 249	s.next, s.prev = nil, nil
 250}
 251
 252type outStreamList struct {
 253	// Following are sentinel objects that mark the
 254	// beginning and end of the list. They do not
 255	// contain any item lists. All valid objects are
 256	// inserted in between them.
 257	// This is needed so that an outStream object can
 258	// deleteSelf() in O(1) time without knowing which
 259	// list it belongs to.
 260	head *outStream
 261	tail *outStream
 262}
 263
 264func newOutStreamList() *outStreamList {
 265	head, tail := new(outStream), new(outStream)
 266	head.next = tail
 267	tail.prev = head
 268	return &outStreamList{
 269		head: head,
 270		tail: tail,
 271	}
 272}
 273
 274func (l *outStreamList) enqueue(s *outStream) {
 275	e := l.tail.prev
 276	e.next = s
 277	s.prev = e
 278	s.next = l.tail
 279	l.tail.prev = s
 280}
 281
 282// remove from the beginning of the list.
 283func (l *outStreamList) dequeue() *outStream {
 284	b := l.head.next
 285	if b == l.tail {
 286		return nil
 287	}
 288	b.deleteSelf()
 289	return b
 290}
 291
 292// controlBuffer is a way to pass information to loopy.
 293//
 294// Information is passed as specific struct types called control frames. A
 295// control frame not only represents data, messages or headers to be sent out
 296// but can also be used to instruct loopy to update its internal state. It
 297// shouldn't be confused with an HTTP2 frame, although some of the control
 298// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
 299type controlBuffer struct {
 300	wakeupCh chan struct{}   // Unblocks readers waiting for something to read.
 301	done     <-chan struct{} // Closed when the transport is done.
 302
 303	// Mutex guards all the fields below, except trfChan which can be read
 304	// atomically without holding mu.
 305	mu              sync.Mutex
 306	consumerWaiting bool      // True when readers are blocked waiting for new data.
 307	closed          bool      // True when the controlbuf is finished.
 308	list            *itemList // List of queued control frames.
 309
 310	// transportResponseFrames counts the number of queued items that represent
 311	// the response of an action initiated by the peer.  trfChan is created
 312	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
 313	// closed and nilled when transportResponseFrames drops below the
 314	// threshold.  Both fields are protected by mu.
 315	transportResponseFrames int
 316	trfChan                 atomic.Pointer[chan struct{}]
 317}
 318
 319func newControlBuffer(done <-chan struct{}) *controlBuffer {
 320	return &controlBuffer{
 321		wakeupCh: make(chan struct{}, 1),
 322		list:     &itemList{},
 323		done:     done,
 324	}
 325}
 326
 327// throttle blocks if there are too many frames in the control buf that
 328// represent the response of an action initiated by the peer, like
 329// incomingSettings cleanupStreams etc.
 330func (c *controlBuffer) throttle() {
 331	if ch := c.trfChan.Load(); ch != nil {
 332		select {
 333		case <-(*ch):
 334		case <-c.done:
 335		}
 336	}
 337}
 338
 339// put adds an item to the controlbuf.
 340func (c *controlBuffer) put(it cbItem) error {
 341	_, err := c.executeAndPut(nil, it)
 342	return err
 343}
 344
 345// executeAndPut runs f, and if the return value is true, adds the given item to
 346// the controlbuf. The item could be nil, in which case, this method simply
 347// executes f and does not add the item to the controlbuf.
 348//
 349// The first return value indicates whether the item was successfully added to
 350// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
 351// if the control buffer is already closed.
 352func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
 353	c.mu.Lock()
 354	defer c.mu.Unlock()
 355
 356	if c.closed {
 357		return false, ErrConnClosing
 358	}
 359	if f != nil {
 360		if !f() { // f wasn't successful
 361			return false, nil
 362		}
 363	}
 364	if it == nil {
 365		return true, nil
 366	}
 367
 368	var wakeUp bool
 369	if c.consumerWaiting {
 370		wakeUp = true
 371		c.consumerWaiting = false
 372	}
 373	c.list.enqueue(it)
 374	if it.isTransportResponseFrame() {
 375		c.transportResponseFrames++
 376		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
 377			// We are adding the frame that puts us over the threshold; create
 378			// a throttling channel.
 379			ch := make(chan struct{})
 380			c.trfChan.Store(&ch)
 381		}
 382	}
 383	if wakeUp {
 384		select {
 385		case c.wakeupCh <- struct{}{}:
 386		default:
 387		}
 388	}
 389	return true, nil
 390}
 391
 392// get returns the next control frame from the control buffer. If block is true
 393// **and** there are no control frames in the control buffer, the call blocks
 394// until one of the conditions is met: there is a frame to return or the
 395// transport is closed.
 396func (c *controlBuffer) get(block bool) (any, error) {
 397	for {
 398		c.mu.Lock()
 399		frame, err := c.getOnceLocked()
 400		if frame != nil || err != nil || !block {
 401			// If we read a frame or an error, we can return to the caller. The
 402			// call to getOnceLocked() returns a nil frame and a nil error if
 403			// there is nothing to read, and in that case, if the caller asked
 404			// us not to block, we can return now as well.
 405			c.mu.Unlock()
 406			return frame, err
 407		}
 408		c.consumerWaiting = true
 409		c.mu.Unlock()
 410
 411		// Release the lock above and wait to be woken up.
 412		select {
 413		case <-c.wakeupCh:
 414		case <-c.done:
 415			return nil, errors.New("transport closed by client")
 416		}
 417	}
 418}
 419
 420// Callers must not use this method, but should instead use get().
 421//
 422// Caller must hold c.mu.
 423func (c *controlBuffer) getOnceLocked() (any, error) {
 424	if c.closed {
 425		return false, ErrConnClosing
 426	}
 427	if c.list.isEmpty() {
 428		return nil, nil
 429	}
 430	h := c.list.dequeue().(cbItem)
 431	if h.isTransportResponseFrame() {
 432		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
 433			// We are removing the frame that put us over the
 434			// threshold; close and clear the throttling channel.
 435			ch := c.trfChan.Swap(nil)
 436			close(*ch)
 437		}
 438		c.transportResponseFrames--
 439	}
 440	return h, nil
 441}
 442
 443// finish closes the control buffer, cleaning up any streams that have queued
 444// header frames. Once this method returns, no more frames can be added to the
 445// control buffer, and attempts to do so will return ErrConnClosing.
 446func (c *controlBuffer) finish() {
 447	c.mu.Lock()
 448	defer c.mu.Unlock()
 449
 450	if c.closed {
 451		return
 452	}
 453	c.closed = true
 454	// There may be headers for streams in the control buffer.
 455	// These streams need to be cleaned out since the transport
 456	// is still not aware of these yet.
 457	for head := c.list.dequeueAll(); head != nil; head = head.next {
 458		switch v := head.it.(type) {
 459		case *headerFrame:
 460			if v.onOrphaned != nil { // It will be nil on the server-side.
 461				v.onOrphaned(ErrConnClosing)
 462			}
 463		case *dataFrame:
 464			_ = v.reader.Close()
 465		}
 466	}
 467
 468	// In case throttle() is currently in flight, it needs to be unblocked.
 469	// Otherwise, the transport may not close, since the transport is closed by
 470	// the reader encountering the connection error.
 471	ch := c.trfChan.Swap(nil)
 472	if ch != nil {
 473		close(*ch)
 474	}
 475}
 476
 477type side int
 478
 479const (
 480	clientSide side = iota
 481	serverSide
 482)
 483
 484// Loopy receives frames from the control buffer.
 485// Each frame is handled individually; most of the work done by loopy goes
 486// into handling data frames. Loopy maintains a queue of active streams, and each
 487// stream maintains a queue of data frames; as loopy receives data frames
 488// it gets added to the queue of the relevant stream.
 489// Loopy goes over this list of active streams by processing one node every iteration,
 490// thereby closely resembling a round-robin scheduling over all streams. While
 491// processing a stream, loopy writes out data bytes from this stream capped by the min
 492// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
 493type loopyWriter struct {
 494	side      side
 495	cbuf      *controlBuffer
 496	sendQuota uint32
 497	oiws      uint32 // outbound initial window size.
 498	// estdStreams is map of all established streams that are not cleaned-up yet.
 499	// On client-side, this is all streams whose headers were sent out.
 500	// On server-side, this is all streams whose headers were received.
 501	estdStreams map[uint32]*outStream // Established streams.
 502	// activeStreams is a linked-list of all streams that have data to send and some
 503	// stream-level flow control quota.
 504	// Each of these streams internally have a list of data items(and perhaps trailers
 505	// on the server-side) to be sent out.
 506	activeStreams *outStreamList
 507	framer        *framer
 508	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
 509	hEnc          *hpack.Encoder // HPACK encoder.
 510	bdpEst        *bdpEstimator
 511	draining      bool
 512	conn          net.Conn
 513	logger        *grpclog.PrefixLogger
 514	bufferPool    mem.BufferPool
 515
 516	// Side-specific handlers
 517	ssGoAwayHandler func(*goAway) (bool, error)
 518}
 519
 520func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
 521	var buf bytes.Buffer
 522	l := &loopyWriter{
 523		side:            s,
 524		cbuf:            cbuf,
 525		sendQuota:       defaultWindowSize,
 526		oiws:            defaultWindowSize,
 527		estdStreams:     make(map[uint32]*outStream),
 528		activeStreams:   newOutStreamList(),
 529		framer:          fr,
 530		hBuf:            &buf,
 531		hEnc:            hpack.NewEncoder(&buf),
 532		bdpEst:          bdpEst,
 533		conn:            conn,
 534		logger:          logger,
 535		ssGoAwayHandler: goAwayHandler,
 536		bufferPool:      bufferPool,
 537	}
 538	return l
 539}
 540
 541const minBatchSize = 1000
 542
 543// run should be run in a separate goroutine.
 544// It reads control frames from controlBuf and processes them by:
 545// 1. Updating loopy's internal state, or/and
 546// 2. Writing out HTTP2 frames on the wire.
 547//
 548// Loopy keeps all active streams with data to send in a linked-list.
 549// All streams in the activeStreams linked-list must have both:
 550// 1. Data to send, and
 551// 2. Stream level flow control quota available.
 552//
 553// In each iteration of run loop, other than processing the incoming control
 554// frame, loopy calls processData, which processes one node from the
 555// activeStreams linked-list.  This results in writing of HTTP2 frames into an
 556// underlying write buffer.  When there's no more control frames to read from
 557// controlBuf, loopy flushes the write buffer.  As an optimization, to increase
 558// the batch size for each flush, loopy yields the processor, once if the batch
 559// size is too low to give stream goroutines a chance to fill it up.
 560//
 561// Upon exiting, if the error causing the exit is not an I/O error, run()
 562// flushes the underlying connection.  The connection is always left open to
 563// allow different closing behavior on the client and server.
 564func (l *loopyWriter) run() (err error) {
 565	defer func() {
 566		if l.logger.V(logLevel) {
 567			l.logger.Infof("loopyWriter exiting with error: %v", err)
 568		}
 569		if !isIOError(err) {
 570			l.framer.writer.Flush()
 571		}
 572		l.cbuf.finish()
 573	}()
 574	for {
 575		it, err := l.cbuf.get(true)
 576		if err != nil {
 577			return err
 578		}
 579		if err = l.handle(it); err != nil {
 580			return err
 581		}
 582		if _, err = l.processData(); err != nil {
 583			return err
 584		}
 585		gosched := true
 586	hasdata:
 587		for {
 588			it, err := l.cbuf.get(false)
 589			if err != nil {
 590				return err
 591			}
 592			if it != nil {
 593				if err = l.handle(it); err != nil {
 594					return err
 595				}
 596				if _, err = l.processData(); err != nil {
 597					return err
 598				}
 599				continue hasdata
 600			}
 601			isEmpty, err := l.processData()
 602			if err != nil {
 603				return err
 604			}
 605			if !isEmpty {
 606				continue hasdata
 607			}
 608			if gosched {
 609				gosched = false
 610				if l.framer.writer.offset < minBatchSize {
 611					runtime.Gosched()
 612					continue hasdata
 613				}
 614			}
 615			l.framer.writer.Flush()
 616			break hasdata
 617		}
 618	}
 619}
 620
 621func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
 622	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
 623}
 624
 625func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
 626	// Otherwise update the quota.
 627	if w.streamID == 0 {
 628		l.sendQuota += w.increment
 629		return
 630	}
 631	// Find the stream and update it.
 632	if str, ok := l.estdStreams[w.streamID]; ok {
 633		str.bytesOutStanding -= int(w.increment)
 634		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
 635			str.state = active
 636			l.activeStreams.enqueue(str)
 637			return
 638		}
 639	}
 640}
 641
 642func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
 643	return l.framer.fr.WriteSettings(s.ss...)
 644}
 645
 646func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
 647	l.applySettings(s.ss)
 648	return l.framer.fr.WriteSettingsAck()
 649}
 650
 651func (l *loopyWriter) registerStreamHandler(h *registerStream) {
 652	str := &outStream{
 653		id:    h.streamID,
 654		state: empty,
 655		itl:   &itemList{},
 656		wq:    h.wq,
 657	}
 658	l.estdStreams[h.streamID] = str
 659}
 660
 661func (l *loopyWriter) headerHandler(h *headerFrame) error {
 662	if l.side == serverSide {
 663		str, ok := l.estdStreams[h.streamID]
 664		if !ok {
 665			if l.logger.V(logLevel) {
 666				l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
 667			}
 668			return nil
 669		}
 670		// Case 1.A: Server is responding back with headers.
 671		if !h.endStream {
 672			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
 673		}
 674		// else:  Case 1.B: Server wants to close stream.
 675
 676		if str.state != empty { // either active or waiting on stream quota.
 677			// add it str's list of items.
 678			str.itl.enqueue(h)
 679			return nil
 680		}
 681		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
 682			return err
 683		}
 684		return l.cleanupStreamHandler(h.cleanup)
 685	}
 686	// Case 2: Client wants to originate stream.
 687	str := &outStream{
 688		id:    h.streamID,
 689		state: empty,
 690		itl:   &itemList{},
 691		wq:    h.wq,
 692	}
 693	return l.originateStream(str, h)
 694}
 695
 696func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
 697	// l.draining is set when handling GoAway. In which case, we want to avoid
 698	// creating new streams.
 699	if l.draining {
 700		// TODO: provide a better error with the reason we are in draining.
 701		hdr.onOrphaned(errStreamDrain)
 702		return nil
 703	}
 704	if err := hdr.initStream(str.id); err != nil {
 705		return err
 706	}
 707	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
 708		return err
 709	}
 710	l.estdStreams[str.id] = str
 711	return nil
 712}
 713
 714func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
 715	if onWrite != nil {
 716		onWrite()
 717	}
 718	l.hBuf.Reset()
 719	for _, f := range hf {
 720		if err := l.hEnc.WriteField(f); err != nil {
 721			if l.logger.V(logLevel) {
 722				l.logger.Warningf("Encountered error while encoding headers: %v", err)
 723			}
 724		}
 725	}
 726	var (
 727		err               error
 728		endHeaders, first bool
 729	)
 730	first = true
 731	for !endHeaders {
 732		size := l.hBuf.Len()
 733		if size > http2MaxFrameLen {
 734			size = http2MaxFrameLen
 735		} else {
 736			endHeaders = true
 737		}
 738		if first {
 739			first = false
 740			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
 741				StreamID:      streamID,
 742				BlockFragment: l.hBuf.Next(size),
 743				EndStream:     endStream,
 744				EndHeaders:    endHeaders,
 745			})
 746		} else {
 747			err = l.framer.fr.WriteContinuation(
 748				streamID,
 749				endHeaders,
 750				l.hBuf.Next(size),
 751			)
 752		}
 753		if err != nil {
 754			return err
 755		}
 756	}
 757	return nil
 758}
 759
 760func (l *loopyWriter) preprocessData(df *dataFrame) {
 761	str, ok := l.estdStreams[df.streamID]
 762	if !ok {
 763		return
 764	}
 765	// If we got data for a stream it means that
 766	// stream was originated and the headers were sent out.
 767	str.itl.enqueue(df)
 768	if str.state == empty {
 769		str.state = active
 770		l.activeStreams.enqueue(str)
 771	}
 772}
 773
 774func (l *loopyWriter) pingHandler(p *ping) error {
 775	if !p.ack {
 776		l.bdpEst.timesnap(p.data)
 777	}
 778	return l.framer.fr.WritePing(p.ack, p.data)
 779
 780}
 781
 782func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
 783	o.resp <- l.sendQuota
 784}
 785
 786func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
 787	c.onWrite()
 788	if str, ok := l.estdStreams[c.streamID]; ok {
 789		// On the server side it could be a trailers-only response or
 790		// a RST_STREAM before stream initialization thus the stream might
 791		// not be established yet.
 792		delete(l.estdStreams, c.streamID)
 793		str.deleteSelf()
 794		for head := str.itl.dequeueAll(); head != nil; head = head.next {
 795			if df, ok := head.it.(*dataFrame); ok {
 796				_ = df.reader.Close()
 797			}
 798		}
 799	}
 800	if c.rst { // If RST_STREAM needs to be sent.
 801		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
 802			return err
 803		}
 804	}
 805	if l.draining && len(l.estdStreams) == 0 {
 806		// Flush and close the connection; we are done with it.
 807		return errors.New("finished processing active streams while in draining mode")
 808	}
 809	return nil
 810}
 811
 812func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
 813	if l.side == clientSide {
 814		return errors.New("earlyAbortStream not handled on client")
 815	}
 816	// In case the caller forgets to set the http status, default to 200.
 817	if eas.httpStatus == 0 {
 818		eas.httpStatus = 200
 819	}
 820	headerFields := []hpack.HeaderField{
 821		{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
 822		{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
 823		{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
 824		{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
 825	}
 826
 827	if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
 828		return err
 829	}
 830	if eas.rst {
 831		if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
 832			return err
 833		}
 834	}
 835	return nil
 836}
 837
 838func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
 839	if l.side == clientSide {
 840		l.draining = true
 841		if len(l.estdStreams) == 0 {
 842			// Flush and close the connection; we are done with it.
 843			return errors.New("received GOAWAY with no active streams")
 844		}
 845	}
 846	return nil
 847}
 848
 849func (l *loopyWriter) goAwayHandler(g *goAway) error {
 850	// Handling of outgoing GoAway is very specific to side.
 851	if l.ssGoAwayHandler != nil {
 852		draining, err := l.ssGoAwayHandler(g)
 853		if err != nil {
 854			return err
 855		}
 856		l.draining = draining
 857	}
 858	return nil
 859}
 860
 861func (l *loopyWriter) handle(i any) error {
 862	switch i := i.(type) {
 863	case *incomingWindowUpdate:
 864		l.incomingWindowUpdateHandler(i)
 865	case *outgoingWindowUpdate:
 866		return l.outgoingWindowUpdateHandler(i)
 867	case *incomingSettings:
 868		return l.incomingSettingsHandler(i)
 869	case *outgoingSettings:
 870		return l.outgoingSettingsHandler(i)
 871	case *headerFrame:
 872		return l.headerHandler(i)
 873	case *registerStream:
 874		l.registerStreamHandler(i)
 875	case *cleanupStream:
 876		return l.cleanupStreamHandler(i)
 877	case *earlyAbortStream:
 878		return l.earlyAbortStreamHandler(i)
 879	case *incomingGoAway:
 880		return l.incomingGoAwayHandler(i)
 881	case *dataFrame:
 882		l.preprocessData(i)
 883	case *ping:
 884		return l.pingHandler(i)
 885	case *goAway:
 886		return l.goAwayHandler(i)
 887	case *outFlowControlSizeRequest:
 888		l.outFlowControlSizeRequestHandler(i)
 889	case closeConnection:
 890		// Just return a non-I/O error and run() will flush and close the
 891		// connection.
 892		return ErrConnClosing
 893	default:
 894		return fmt.Errorf("transport: unknown control message type %T", i)
 895	}
 896	return nil
 897}
 898
 899func (l *loopyWriter) applySettings(ss []http2.Setting) {
 900	for _, s := range ss {
 901		switch s.ID {
 902		case http2.SettingInitialWindowSize:
 903			o := l.oiws
 904			l.oiws = s.Val
 905			if o < l.oiws {
 906				// If the new limit is greater make all depleted streams active.
 907				for _, stream := range l.estdStreams {
 908					if stream.state == waitingOnStreamQuota {
 909						stream.state = active
 910						l.activeStreams.enqueue(stream)
 911					}
 912				}
 913			}
 914		case http2.SettingHeaderTableSize:
 915			updateHeaderTblSize(l.hEnc, s.Val)
 916		}
 917	}
 918}
 919
 920// processData removes the first stream from active streams, writes out at most 16KB
 921// of its data and then puts it at the end of activeStreams if there's still more data
 922// to be sent and stream has some stream-level flow control.
 923func (l *loopyWriter) processData() (bool, error) {
 924	if l.sendQuota == 0 {
 925		return true, nil
 926	}
 927	str := l.activeStreams.dequeue() // Remove the first stream.
 928	if str == nil {
 929		return true, nil
 930	}
 931	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
 932	// A data item is represented by a dataFrame, since it later translates into
 933	// multiple HTTP2 data frames.
 934	// Every dataFrame has two buffers; h that keeps grpc-message header and data
 935	// that is the actual message. As an optimization to keep wire traffic low, data
 936	// from data is copied to h to make as big as the maximum possible HTTP2 frame
 937	// size.
 938
 939	if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
 940		// Client sends out empty data frame with endStream = true
 941		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
 942			return false, err
 943		}
 944		str.itl.dequeue() // remove the empty data item from stream
 945		_ = dataItem.reader.Close()
 946		if str.itl.isEmpty() {
 947			str.state = empty
 948		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
 949			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
 950				return false, err
 951			}
 952			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
 953				return false, err
 954			}
 955		} else {
 956			l.activeStreams.enqueue(str)
 957		}
 958		return false, nil
 959	}
 960
 961	// Figure out the maximum size we can send
 962	maxSize := http2MaxFrameLen
 963	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
 964		str.state = waitingOnStreamQuota
 965		return false, nil
 966	} else if maxSize > strQuota {
 967		maxSize = strQuota
 968	}
 969	if maxSize > int(l.sendQuota) { // connection-level flow control.
 970		maxSize = int(l.sendQuota)
 971	}
 972	// Compute how much of the header and data we can send within quota and max frame length
 973	hSize := min(maxSize, len(dataItem.h))
 974	dSize := min(maxSize-hSize, dataItem.reader.Remaining())
 975	remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
 976	size := hSize + dSize
 977
 978	var buf *[]byte
 979
 980	if hSize != 0 && dSize == 0 {
 981		buf = &dataItem.h
 982	} else {
 983		// Note: this is only necessary because the http2.Framer does not support
 984		// partially writing a frame, so the sequence must be materialized into a buffer.
 985		// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
 986		pool := l.bufferPool
 987		if pool == nil {
 988			// Note that this is only supposed to be nil in tests. Otherwise, stream is
 989			// always initialized with a BufferPool.
 990			pool = mem.DefaultBufferPool()
 991		}
 992		buf = pool.Get(size)
 993		defer pool.Put(buf)
 994
 995		copy((*buf)[:hSize], dataItem.h)
 996		_, _ = dataItem.reader.Read((*buf)[hSize:])
 997	}
 998
 999	// Now that outgoing flow controls are checked we can replenish str's write quota
1000	str.wq.replenish(size)
1001	var endStream bool
1002	// If this is the last data message on this stream and all of it can be written in this iteration.
1003	if dataItem.endStream && remainingBytes == 0 {
1004		endStream = true
1005	}
1006	if dataItem.onEachWrite != nil {
1007		dataItem.onEachWrite()
1008	}
1009	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
1010		return false, err
1011	}
1012	str.bytesOutStanding += size
1013	l.sendQuota -= uint32(size)
1014	dataItem.h = dataItem.h[hSize:]
1015
1016	if remainingBytes == 0 { // All the data from that message was written out.
1017		_ = dataItem.reader.Close()
1018		str.itl.dequeue()
1019	}
1020	if str.itl.isEmpty() {
1021		str.state = empty
1022	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
1023		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
1024			return false, err
1025		}
1026		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
1027			return false, err
1028		}
1029	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
1030		str.state = waitingOnStreamQuota
1031	} else { // Otherwise add it back to the list of active streams.
1032		l.activeStreams.enqueue(str)
1033	}
1034	return false, nil
1035}