http2_server.go

   1/*
   2 *
   3 * Copyright 2014 gRPC authors.
   4 *
   5 * Licensed under the Apache License, Version 2.0 (the "License");
   6 * you may not use this file except in compliance with the License.
   7 * You may obtain a copy of the License at
   8 *
   9 *     http://www.apache.org/licenses/LICENSE-2.0
  10 *
  11 * Unless required by applicable law or agreed to in writing, software
  12 * distributed under the License is distributed on an "AS IS" BASIS,
  13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 * See the License for the specific language governing permissions and
  15 * limitations under the License.
  16 *
  17 */
  18
  19package transport
  20
  21import (
  22	"bytes"
  23	"context"
  24	"errors"
  25	"fmt"
  26	"io"
  27	"math"
  28	rand "math/rand/v2"
  29	"net"
  30	"net/http"
  31	"strconv"
  32	"sync"
  33	"sync/atomic"
  34	"time"
  35
  36	"golang.org/x/net/http2"
  37	"golang.org/x/net/http2/hpack"
  38	"google.golang.org/grpc/internal/grpclog"
  39	"google.golang.org/grpc/internal/grpcutil"
  40	"google.golang.org/grpc/internal/pretty"
  41	"google.golang.org/grpc/internal/syscall"
  42	"google.golang.org/grpc/mem"
  43	"google.golang.org/protobuf/proto"
  44
  45	"google.golang.org/grpc/codes"
  46	"google.golang.org/grpc/credentials"
  47	"google.golang.org/grpc/internal/channelz"
  48	"google.golang.org/grpc/internal/grpcsync"
  49	"google.golang.org/grpc/keepalive"
  50	"google.golang.org/grpc/metadata"
  51	"google.golang.org/grpc/peer"
  52	"google.golang.org/grpc/stats"
  53	"google.golang.org/grpc/status"
  54	"google.golang.org/grpc/tap"
  55)
  56
  57var (
  58	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
  59	// the stream's state.
  60	ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  61	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  62	// than the limit set by peer.
  63	ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
  64)
  65
  66// serverConnectionCounter counts the number of connections a server has seen
  67// (equal to the number of http2Servers created). Must be accessed atomically.
  68var serverConnectionCounter uint64
  69
  70// http2Server implements the ServerTransport interface with HTTP2.
  71type http2Server struct {
  72	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically.
  73	done            chan struct{}
  74	conn            net.Conn
  75	loopy           *loopyWriter
  76	readerDone      chan struct{} // sync point to enable testing.
  77	loopyWriterDone chan struct{}
  78	peer            peer.Peer
  79	inTapHandle     tap.ServerInHandle
  80	framer          *framer
  81	// The max number of concurrent streams.
  82	maxStreams uint32
  83	// controlBuf delivers all the control related tasks (e.g., window
  84	// updates, reset streams, and various settings) to the controller.
  85	controlBuf *controlBuffer
  86	fc         *trInFlow
  87	stats      []stats.Handler
  88	// Keepalive and max-age parameters for the server.
  89	kp keepalive.ServerParameters
  90	// Keepalive enforcement policy.
  91	kep keepalive.EnforcementPolicy
  92	// The time instance last ping was received.
  93	lastPingAt time.Time
  94	// Number of times the client has violated keepalive ping policy so far.
  95	pingStrikes uint8
  96	// Flag to signify that number of ping strikes should be reset to 0.
  97	// This is set whenever data or header frames are sent.
  98	// 1 means yes.
  99	resetPingStrikes      uint32 // Accessed atomically.
 100	initialWindowSize     int32
 101	bdpEst                *bdpEstimator
 102	maxSendHeaderListSize *uint32
 103
 104	mu sync.Mutex // guard the following
 105
 106	// drainEvent is initialized when Drain() is called the first time. After
 107	// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
 108	// an independent goroutine will be launched to later send the second
 109	// GoAway. During this time we don't want to write another first GoAway(with
 110	// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
 111	// already initialized since draining is already underway.
 112	drainEvent    *grpcsync.Event
 113	state         transportState
 114	activeStreams map[uint32]*ServerStream
 115	// idle is the time instant when the connection went idle.
 116	// This is either the beginning of the connection or when the number of
 117	// RPCs go down to 0.
 118	// When the connection is busy, this value is set to 0.
 119	idle time.Time
 120
 121	// Fields below are for channelz metric collection.
 122	channelz   *channelz.Socket
 123	bufferPool mem.BufferPool
 124
 125	connectionID uint64
 126
 127	// maxStreamMu guards the maximum stream ID
 128	// This lock may not be taken if mu is already held.
 129	maxStreamMu sync.Mutex
 130	maxStreamID uint32 // max stream ID ever seen
 131
 132	logger *grpclog.PrefixLogger
 133}
 134
 135// NewServerTransport creates a http2 transport with conn and configuration
 136// options from config.
 137//
 138// It returns a non-nil transport and a nil error on success. On failure, it
 139// returns a nil transport and a non-nil error. For a special case where the
 140// underlying conn gets closed before the client preface could be read, it
 141// returns a nil transport and a nil error.
 142func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
 143	var authInfo credentials.AuthInfo
 144	rawConn := conn
 145	if config.Credentials != nil {
 146		var err error
 147		conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
 148		if err != nil {
 149			// ErrConnDispatched means that the connection was dispatched away
 150			// from gRPC; those connections should be left open. io.EOF means
 151			// the connection was closed before handshaking completed, which can
 152			// happen naturally from probers. Return these errors directly.
 153			if err == credentials.ErrConnDispatched || err == io.EOF {
 154				return nil, err
 155			}
 156			return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
 157		}
 158	}
 159	writeBufSize := config.WriteBufferSize
 160	readBufSize := config.ReadBufferSize
 161	maxHeaderListSize := defaultServerMaxHeaderListSize
 162	if config.MaxHeaderListSize != nil {
 163		maxHeaderListSize = *config.MaxHeaderListSize
 164	}
 165	framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
 166	// Send initial settings as connection preface to client.
 167	isettings := []http2.Setting{{
 168		ID:  http2.SettingMaxFrameSize,
 169		Val: http2MaxFrameLen,
 170	}}
 171	if config.MaxStreams != math.MaxUint32 {
 172		isettings = append(isettings, http2.Setting{
 173			ID:  http2.SettingMaxConcurrentStreams,
 174			Val: config.MaxStreams,
 175		})
 176	}
 177	dynamicWindow := true
 178	iwz := int32(initialWindowSize)
 179	if config.InitialWindowSize >= defaultWindowSize {
 180		iwz = config.InitialWindowSize
 181		dynamicWindow = false
 182	}
 183	icwz := int32(initialWindowSize)
 184	if config.InitialConnWindowSize >= defaultWindowSize {
 185		icwz = config.InitialConnWindowSize
 186		dynamicWindow = false
 187	}
 188	if iwz != defaultWindowSize {
 189		isettings = append(isettings, http2.Setting{
 190			ID:  http2.SettingInitialWindowSize,
 191			Val: uint32(iwz)})
 192	}
 193	if config.MaxHeaderListSize != nil {
 194		isettings = append(isettings, http2.Setting{
 195			ID:  http2.SettingMaxHeaderListSize,
 196			Val: *config.MaxHeaderListSize,
 197		})
 198	}
 199	if config.HeaderTableSize != nil {
 200		isettings = append(isettings, http2.Setting{
 201			ID:  http2.SettingHeaderTableSize,
 202			Val: *config.HeaderTableSize,
 203		})
 204	}
 205	if err := framer.fr.WriteSettings(isettings...); err != nil {
 206		return nil, connectionErrorf(false, err, "transport: %v", err)
 207	}
 208	// Adjust the connection flow control window if needed.
 209	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
 210		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
 211			return nil, connectionErrorf(false, err, "transport: %v", err)
 212		}
 213	}
 214	kp := config.KeepaliveParams
 215	if kp.MaxConnectionIdle == 0 {
 216		kp.MaxConnectionIdle = defaultMaxConnectionIdle
 217	}
 218	if kp.MaxConnectionAge == 0 {
 219		kp.MaxConnectionAge = defaultMaxConnectionAge
 220	}
 221	// Add a jitter to MaxConnectionAge.
 222	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
 223	if kp.MaxConnectionAgeGrace == 0 {
 224		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
 225	}
 226	if kp.Time == 0 {
 227		kp.Time = defaultServerKeepaliveTime
 228	}
 229	if kp.Timeout == 0 {
 230		kp.Timeout = defaultServerKeepaliveTimeout
 231	}
 232	if kp.Time != infinity {
 233		if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
 234			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
 235		}
 236	}
 237	kep := config.KeepalivePolicy
 238	if kep.MinTime == 0 {
 239		kep.MinTime = defaultKeepalivePolicyMinTime
 240	}
 241
 242	done := make(chan struct{})
 243	peer := peer.Peer{
 244		Addr:      conn.RemoteAddr(),
 245		LocalAddr: conn.LocalAddr(),
 246		AuthInfo:  authInfo,
 247	}
 248	t := &http2Server{
 249		done:              done,
 250		conn:              conn,
 251		peer:              peer,
 252		framer:            framer,
 253		readerDone:        make(chan struct{}),
 254		loopyWriterDone:   make(chan struct{}),
 255		maxStreams:        config.MaxStreams,
 256		inTapHandle:       config.InTapHandle,
 257		fc:                &trInFlow{limit: uint32(icwz)},
 258		state:             reachable,
 259		activeStreams:     make(map[uint32]*ServerStream),
 260		stats:             config.StatsHandlers,
 261		kp:                kp,
 262		idle:              time.Now(),
 263		kep:               kep,
 264		initialWindowSize: iwz,
 265		bufferPool:        config.BufferPool,
 266	}
 267	var czSecurity credentials.ChannelzSecurityValue
 268	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
 269		czSecurity = au.GetSecurityValue()
 270	}
 271	t.channelz = channelz.RegisterSocket(
 272		&channelz.Socket{
 273			SocketType:       channelz.SocketTypeNormal,
 274			Parent:           config.ChannelzParent,
 275			SocketMetrics:    channelz.SocketMetrics{},
 276			EphemeralMetrics: t.socketMetrics,
 277			LocalAddr:        t.peer.LocalAddr,
 278			RemoteAddr:       t.peer.Addr,
 279			SocketOptions:    channelz.GetSocketOption(t.conn),
 280			Security:         czSecurity,
 281		},
 282	)
 283	t.logger = prefixLoggerForServerTransport(t)
 284
 285	t.controlBuf = newControlBuffer(t.done)
 286	if dynamicWindow {
 287		t.bdpEst = &bdpEstimator{
 288			bdp:               initialWindowSize,
 289			updateFlowControl: t.updateFlowControl,
 290		}
 291	}
 292
 293	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
 294	t.framer.writer.Flush()
 295
 296	defer func() {
 297		if err != nil {
 298			t.Close(err)
 299		}
 300	}()
 301
 302	// Check the validity of client preface.
 303	preface := make([]byte, len(clientPreface))
 304	if _, err := io.ReadFull(t.conn, preface); err != nil {
 305		// In deployments where a gRPC server runs behind a cloud load balancer
 306		// which performs regular TCP level health checks, the connection is
 307		// closed immediately by the latter.  Returning io.EOF here allows the
 308		// grpc server implementation to recognize this scenario and suppress
 309		// logging to reduce spam.
 310		if err == io.EOF {
 311			return nil, io.EOF
 312		}
 313		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
 314	}
 315	if !bytes.Equal(preface, clientPreface) {
 316		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
 317	}
 318
 319	frame, err := t.framer.fr.ReadFrame()
 320	if err == io.EOF || err == io.ErrUnexpectedEOF {
 321		return nil, err
 322	}
 323	if err != nil {
 324		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
 325	}
 326	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 327	sf, ok := frame.(*http2.SettingsFrame)
 328	if !ok {
 329		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
 330	}
 331	t.handleSettings(sf)
 332
 333	go func() {
 334		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
 335		err := t.loopy.run()
 336		close(t.loopyWriterDone)
 337		if !isIOError(err) {
 338			// Close the connection if a non-I/O error occurs (for I/O errors
 339			// the reader will also encounter the error and close).  Wait 1
 340			// second before closing the connection, or when the reader is done
 341			// (i.e. the client already closed the connection or a connection
 342			// error occurred).  This avoids the potential problem where there
 343			// is unread data on the receive side of the connection, which, if
 344			// closed, would lead to a TCP RST instead of FIN, and the client
 345			// encountering errors.  For more info:
 346			// https://github.com/grpc/grpc-go/issues/5358
 347			timer := time.NewTimer(time.Second)
 348			defer timer.Stop()
 349			select {
 350			case <-t.readerDone:
 351			case <-timer.C:
 352			}
 353			t.conn.Close()
 354		}
 355	}()
 356	go t.keepalive()
 357	return t, nil
 358}
 359
 360// operateHeaders takes action on the decoded headers. Returns an error if fatal
 361// error encountered and transport needs to close, otherwise returns nil.
 362func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
 363	// Acquire max stream ID lock for entire duration
 364	t.maxStreamMu.Lock()
 365	defer t.maxStreamMu.Unlock()
 366
 367	streamID := frame.Header().StreamID
 368
 369	// frame.Truncated is set to true when framer detects that the current header
 370	// list size hits MaxHeaderListSize limit.
 371	if frame.Truncated {
 372		t.controlBuf.put(&cleanupStream{
 373			streamID: streamID,
 374			rst:      true,
 375			rstCode:  http2.ErrCodeFrameSize,
 376			onWrite:  func() {},
 377		})
 378		return nil
 379	}
 380
 381	if streamID%2 != 1 || streamID <= t.maxStreamID {
 382		// illegal gRPC stream id.
 383		return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
 384	}
 385	t.maxStreamID = streamID
 386
 387	buf := newRecvBuffer()
 388	s := &ServerStream{
 389		Stream: &Stream{
 390			id:  streamID,
 391			buf: buf,
 392			fc:  &inFlow{limit: uint32(t.initialWindowSize)},
 393		},
 394		st:               t,
 395		headerWireLength: int(frame.Header().Length),
 396	}
 397	var (
 398		// if false, content-type was missing or invalid
 399		isGRPC      = false
 400		contentType = ""
 401		mdata       = make(metadata.MD, len(frame.Fields))
 402		httpMethod  string
 403		// these are set if an error is encountered while parsing the headers
 404		protocolError bool
 405		headerError   *status.Status
 406
 407		timeoutSet bool
 408		timeout    time.Duration
 409	)
 410
 411	for _, hf := range frame.Fields {
 412		switch hf.Name {
 413		case "content-type":
 414			contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
 415			if !validContentType {
 416				contentType = hf.Value
 417				break
 418			}
 419			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
 420			s.contentSubtype = contentSubtype
 421			isGRPC = true
 422
 423		case "grpc-accept-encoding":
 424			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
 425			if hf.Value == "" {
 426				continue
 427			}
 428			compressors := hf.Value
 429			if s.clientAdvertisedCompressors != "" {
 430				compressors = s.clientAdvertisedCompressors + "," + compressors
 431			}
 432			s.clientAdvertisedCompressors = compressors
 433		case "grpc-encoding":
 434			s.recvCompress = hf.Value
 435		case ":method":
 436			httpMethod = hf.Value
 437		case ":path":
 438			s.method = hf.Value
 439		case "grpc-timeout":
 440			timeoutSet = true
 441			var err error
 442			if timeout, err = decodeTimeout(hf.Value); err != nil {
 443				headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
 444			}
 445		// "Transports must consider requests containing the Connection header
 446		// as malformed." - A41
 447		case "connection":
 448			if t.logger.V(logLevel) {
 449				t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
 450			}
 451			protocolError = true
 452		default:
 453			if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
 454				break
 455			}
 456			v, err := decodeMetadataHeader(hf.Name, hf.Value)
 457			if err != nil {
 458				headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
 459				t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
 460				break
 461			}
 462			mdata[hf.Name] = append(mdata[hf.Name], v)
 463		}
 464	}
 465
 466	// "If multiple Host headers or multiple :authority headers are present, the
 467	// request must be rejected with an HTTP status code 400 as required by Host
 468	// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
 469	// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
 470	// error, this takes precedence over a client not speaking gRPC.
 471	if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
 472		errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
 473		if t.logger.V(logLevel) {
 474			t.logger.Infof("Aborting the stream early: %v", errMsg)
 475		}
 476		t.controlBuf.put(&earlyAbortStream{
 477			httpStatus:     http.StatusBadRequest,
 478			streamID:       streamID,
 479			contentSubtype: s.contentSubtype,
 480			status:         status.New(codes.Internal, errMsg),
 481			rst:            !frame.StreamEnded(),
 482		})
 483		return nil
 484	}
 485
 486	if protocolError {
 487		t.controlBuf.put(&cleanupStream{
 488			streamID: streamID,
 489			rst:      true,
 490			rstCode:  http2.ErrCodeProtocol,
 491			onWrite:  func() {},
 492		})
 493		return nil
 494	}
 495	if !isGRPC {
 496		t.controlBuf.put(&earlyAbortStream{
 497			httpStatus:     http.StatusUnsupportedMediaType,
 498			streamID:       streamID,
 499			contentSubtype: s.contentSubtype,
 500			status:         status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
 501			rst:            !frame.StreamEnded(),
 502		})
 503		return nil
 504	}
 505	if headerError != nil {
 506		t.controlBuf.put(&earlyAbortStream{
 507			httpStatus:     http.StatusBadRequest,
 508			streamID:       streamID,
 509			contentSubtype: s.contentSubtype,
 510			status:         headerError,
 511			rst:            !frame.StreamEnded(),
 512		})
 513		return nil
 514	}
 515
 516	// "If :authority is missing, Host must be renamed to :authority." - A41
 517	if len(mdata[":authority"]) == 0 {
 518		// No-op if host isn't present, no eventual :authority header is a valid
 519		// RPC.
 520		if host, ok := mdata["host"]; ok {
 521			mdata[":authority"] = host
 522			delete(mdata, "host")
 523		}
 524	} else {
 525		// "If :authority is present, Host must be discarded" - A41
 526		delete(mdata, "host")
 527	}
 528
 529	if frame.StreamEnded() {
 530		// s is just created by the caller. No lock needed.
 531		s.state = streamReadDone
 532	}
 533	if timeoutSet {
 534		s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
 535	} else {
 536		s.ctx, s.cancel = context.WithCancel(ctx)
 537	}
 538
 539	// Attach the received metadata to the context.
 540	if len(mdata) > 0 {
 541		s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
 542	}
 543	t.mu.Lock()
 544	if t.state != reachable {
 545		t.mu.Unlock()
 546		s.cancel()
 547		return nil
 548	}
 549	if uint32(len(t.activeStreams)) >= t.maxStreams {
 550		t.mu.Unlock()
 551		t.controlBuf.put(&cleanupStream{
 552			streamID: streamID,
 553			rst:      true,
 554			rstCode:  http2.ErrCodeRefusedStream,
 555			onWrite:  func() {},
 556		})
 557		s.cancel()
 558		return nil
 559	}
 560	if httpMethod != http.MethodPost {
 561		t.mu.Unlock()
 562		errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
 563		if t.logger.V(logLevel) {
 564			t.logger.Infof("Aborting the stream early: %v", errMsg)
 565		}
 566		t.controlBuf.put(&earlyAbortStream{
 567			httpStatus:     http.StatusMethodNotAllowed,
 568			streamID:       streamID,
 569			contentSubtype: s.contentSubtype,
 570			status:         status.New(codes.Internal, errMsg),
 571			rst:            !frame.StreamEnded(),
 572		})
 573		s.cancel()
 574		return nil
 575	}
 576	if t.inTapHandle != nil {
 577		var err error
 578		if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
 579			t.mu.Unlock()
 580			if t.logger.V(logLevel) {
 581				t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
 582			}
 583			stat, ok := status.FromError(err)
 584			if !ok {
 585				stat = status.New(codes.PermissionDenied, err.Error())
 586			}
 587			t.controlBuf.put(&earlyAbortStream{
 588				httpStatus:     http.StatusOK,
 589				streamID:       s.id,
 590				contentSubtype: s.contentSubtype,
 591				status:         stat,
 592				rst:            !frame.StreamEnded(),
 593			})
 594			return nil
 595		}
 596	}
 597	t.activeStreams[streamID] = s
 598	if len(t.activeStreams) == 1 {
 599		t.idle = time.Time{}
 600	}
 601	t.mu.Unlock()
 602	if channelz.IsOn() {
 603		t.channelz.SocketMetrics.StreamsStarted.Add(1)
 604		t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
 605	}
 606	s.requestRead = func(n int) {
 607		t.adjustWindow(s, uint32(n))
 608	}
 609	s.ctxDone = s.ctx.Done()
 610	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
 611	s.trReader = &transportReader{
 612		reader: &recvBufferReader{
 613			ctx:     s.ctx,
 614			ctxDone: s.ctxDone,
 615			recv:    s.buf,
 616		},
 617		windowHandler: func(n int) {
 618			t.updateWindow(s, uint32(n))
 619		},
 620	}
 621	// Register the stream with loopy.
 622	t.controlBuf.put(&registerStream{
 623		streamID: s.id,
 624		wq:       s.wq,
 625	})
 626	handle(s)
 627	return nil
 628}
 629
 630// HandleStreams receives incoming streams using the given handler. This is
 631// typically run in a separate goroutine.
 632// traceCtx attaches trace to ctx and returns the new context.
 633func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
 634	defer func() {
 635		close(t.readerDone)
 636		<-t.loopyWriterDone
 637	}()
 638	for {
 639		t.controlBuf.throttle()
 640		frame, err := t.framer.fr.ReadFrame()
 641		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 642		if err != nil {
 643			if se, ok := err.(http2.StreamError); ok {
 644				if t.logger.V(logLevel) {
 645					t.logger.Warningf("Encountered http2.StreamError: %v", se)
 646				}
 647				t.mu.Lock()
 648				s := t.activeStreams[se.StreamID]
 649				t.mu.Unlock()
 650				if s != nil {
 651					t.closeStream(s, true, se.Code, false)
 652				} else {
 653					t.controlBuf.put(&cleanupStream{
 654						streamID: se.StreamID,
 655						rst:      true,
 656						rstCode:  se.Code,
 657						onWrite:  func() {},
 658					})
 659				}
 660				continue
 661			}
 662			t.Close(err)
 663			return
 664		}
 665		switch frame := frame.(type) {
 666		case *http2.MetaHeadersFrame:
 667			if err := t.operateHeaders(ctx, frame, handle); err != nil {
 668				// Any error processing client headers, e.g. invalid stream ID,
 669				// is considered a protocol violation.
 670				t.controlBuf.put(&goAway{
 671					code:      http2.ErrCodeProtocol,
 672					debugData: []byte(err.Error()),
 673					closeConn: err,
 674				})
 675				continue
 676			}
 677		case *http2.DataFrame:
 678			t.handleData(frame)
 679		case *http2.RSTStreamFrame:
 680			t.handleRSTStream(frame)
 681		case *http2.SettingsFrame:
 682			t.handleSettings(frame)
 683		case *http2.PingFrame:
 684			t.handlePing(frame)
 685		case *http2.WindowUpdateFrame:
 686			t.handleWindowUpdate(frame)
 687		case *http2.GoAwayFrame:
 688			// TODO: Handle GoAway from the client appropriately.
 689		default:
 690			if t.logger.V(logLevel) {
 691				t.logger.Infof("Received unsupported frame type %T", frame)
 692			}
 693		}
 694	}
 695}
 696
 697func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
 698	t.mu.Lock()
 699	defer t.mu.Unlock()
 700	if t.activeStreams == nil {
 701		// The transport is closing.
 702		return nil, false
 703	}
 704	s, ok := t.activeStreams[f.Header().StreamID]
 705	if !ok {
 706		// The stream is already done.
 707		return nil, false
 708	}
 709	return s, true
 710}
 711
 712// adjustWindow sends out extra window update over the initial window size
 713// of stream if the application is requesting data larger in size than
 714// the window.
 715func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
 716	if w := s.fc.maybeAdjust(n); w > 0 {
 717		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
 718	}
 719
 720}
 721
 722// updateWindow adjusts the inbound quota for the stream and the transport.
 723// Window updates will deliver to the controller for sending when
 724// the cumulative quota exceeds the corresponding threshold.
 725func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
 726	if w := s.fc.onRead(n); w > 0 {
 727		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
 728			increment: w,
 729		})
 730	}
 731}
 732
 733// updateFlowControl updates the incoming flow control windows
 734// for the transport and the stream based on the current bdp
 735// estimation.
 736func (t *http2Server) updateFlowControl(n uint32) {
 737	t.mu.Lock()
 738	for _, s := range t.activeStreams {
 739		s.fc.newLimit(n)
 740	}
 741	t.initialWindowSize = int32(n)
 742	t.mu.Unlock()
 743	t.controlBuf.put(&outgoingWindowUpdate{
 744		streamID:  0,
 745		increment: t.fc.newLimit(n),
 746	})
 747	t.controlBuf.put(&outgoingSettings{
 748		ss: []http2.Setting{
 749			{
 750				ID:  http2.SettingInitialWindowSize,
 751				Val: n,
 752			},
 753		},
 754	})
 755
 756}
 757
 758func (t *http2Server) handleData(f *http2.DataFrame) {
 759	size := f.Header().Length
 760	var sendBDPPing bool
 761	if t.bdpEst != nil {
 762		sendBDPPing = t.bdpEst.add(size)
 763	}
 764	// Decouple connection's flow control from application's read.
 765	// An update on connection's flow control should not depend on
 766	// whether user application has read the data or not. Such a
 767	// restriction is already imposed on the stream's flow control,
 768	// and therefore the sender will be blocked anyways.
 769	// Decoupling the connection flow control will prevent other
 770	// active(fast) streams from starving in presence of slow or
 771	// inactive streams.
 772	if w := t.fc.onData(size); w > 0 {
 773		t.controlBuf.put(&outgoingWindowUpdate{
 774			streamID:  0,
 775			increment: w,
 776		})
 777	}
 778	if sendBDPPing {
 779		// Avoid excessive ping detection (e.g. in an L7 proxy)
 780		// by sending a window update prior to the BDP ping.
 781		if w := t.fc.reset(); w > 0 {
 782			t.controlBuf.put(&outgoingWindowUpdate{
 783				streamID:  0,
 784				increment: w,
 785			})
 786		}
 787		t.controlBuf.put(bdpPing)
 788	}
 789	// Select the right stream to dispatch.
 790	s, ok := t.getStream(f)
 791	if !ok {
 792		return
 793	}
 794	if s.getState() == streamReadDone {
 795		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
 796		return
 797	}
 798	if size > 0 {
 799		if err := s.fc.onData(size); err != nil {
 800			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
 801			return
 802		}
 803		if f.Header().Flags.Has(http2.FlagDataPadded) {
 804			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
 805				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
 806			}
 807		}
 808		// TODO(bradfitz, zhaoq): A copy is required here because there is no
 809		// guarantee f.Data() is consumed before the arrival of next frame.
 810		// Can this copy be eliminated?
 811		if len(f.Data()) > 0 {
 812			pool := t.bufferPool
 813			if pool == nil {
 814				// Note that this is only supposed to be nil in tests. Otherwise, stream is
 815				// always initialized with a BufferPool.
 816				pool = mem.DefaultBufferPool()
 817			}
 818			s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
 819		}
 820	}
 821	if f.StreamEnded() {
 822		// Received the end of stream from the client.
 823		s.compareAndSwapState(streamActive, streamReadDone)
 824		s.write(recvMsg{err: io.EOF})
 825	}
 826}
 827
 828func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
 829	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
 830	if s, ok := t.getStream(f); ok {
 831		t.closeStream(s, false, 0, false)
 832		return
 833	}
 834	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
 835	t.controlBuf.put(&cleanupStream{
 836		streamID: f.Header().StreamID,
 837		rst:      false,
 838		rstCode:  0,
 839		onWrite:  func() {},
 840	})
 841}
 842
 843func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
 844	if f.IsAck() {
 845		return
 846	}
 847	var ss []http2.Setting
 848	var updateFuncs []func()
 849	f.ForeachSetting(func(s http2.Setting) error {
 850		switch s.ID {
 851		case http2.SettingMaxHeaderListSize:
 852			updateFuncs = append(updateFuncs, func() {
 853				t.maxSendHeaderListSize = new(uint32)
 854				*t.maxSendHeaderListSize = s.Val
 855			})
 856		default:
 857			ss = append(ss, s)
 858		}
 859		return nil
 860	})
 861	t.controlBuf.executeAndPut(func() bool {
 862		for _, f := range updateFuncs {
 863			f()
 864		}
 865		return true
 866	}, &incomingSettings{
 867		ss: ss,
 868	})
 869}
 870
 871const (
 872	maxPingStrikes     = 2
 873	defaultPingTimeout = 2 * time.Hour
 874)
 875
 876func (t *http2Server) handlePing(f *http2.PingFrame) {
 877	if f.IsAck() {
 878		if f.Data == goAwayPing.data && t.drainEvent != nil {
 879			t.drainEvent.Fire()
 880			return
 881		}
 882		// Maybe it's a BDP ping.
 883		if t.bdpEst != nil {
 884			t.bdpEst.calculate(f.Data)
 885		}
 886		return
 887	}
 888	pingAck := &ping{ack: true}
 889	copy(pingAck.data[:], f.Data[:])
 890	t.controlBuf.put(pingAck)
 891
 892	now := time.Now()
 893	defer func() {
 894		t.lastPingAt = now
 895	}()
 896	// A reset ping strikes means that we don't need to check for policy
 897	// violation for this ping and the pingStrikes counter should be set
 898	// to 0.
 899	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
 900		t.pingStrikes = 0
 901		return
 902	}
 903	t.mu.Lock()
 904	ns := len(t.activeStreams)
 905	t.mu.Unlock()
 906	if ns < 1 && !t.kep.PermitWithoutStream {
 907		// Keepalive shouldn't be active thus, this new ping should
 908		// have come after at least defaultPingTimeout.
 909		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
 910			t.pingStrikes++
 911		}
 912	} else {
 913		// Check if keepalive policy is respected.
 914		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
 915			t.pingStrikes++
 916		}
 917	}
 918
 919	if t.pingStrikes > maxPingStrikes {
 920		// Send goaway and close the connection.
 921		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
 922	}
 923}
 924
 925func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
 926	t.controlBuf.put(&incomingWindowUpdate{
 927		streamID:  f.Header().StreamID,
 928		increment: f.Increment,
 929	})
 930}
 931
 932func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
 933	for k, vv := range md {
 934		if isReservedHeader(k) {
 935			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
 936			continue
 937		}
 938		for _, v := range vv {
 939			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
 940		}
 941	}
 942	return headerFields
 943}
 944
 945func (t *http2Server) checkForHeaderListSize(it any) bool {
 946	if t.maxSendHeaderListSize == nil {
 947		return true
 948	}
 949	hdrFrame := it.(*headerFrame)
 950	var sz int64
 951	for _, f := range hdrFrame.hf {
 952		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
 953			if t.logger.V(logLevel) {
 954				t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
 955			}
 956			return false
 957		}
 958	}
 959	return true
 960}
 961
 962func (t *http2Server) streamContextErr(s *ServerStream) error {
 963	select {
 964	case <-t.done:
 965		return ErrConnClosing
 966	default:
 967	}
 968	return ContextErr(s.ctx.Err())
 969}
 970
 971// WriteHeader sends the header metadata md back to the client.
 972func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
 973	s.hdrMu.Lock()
 974	defer s.hdrMu.Unlock()
 975	if s.getState() == streamDone {
 976		return t.streamContextErr(s)
 977	}
 978
 979	if s.updateHeaderSent() {
 980		return ErrIllegalHeaderWrite
 981	}
 982
 983	if md.Len() > 0 {
 984		if s.header.Len() > 0 {
 985			s.header = metadata.Join(s.header, md)
 986		} else {
 987			s.header = md
 988		}
 989	}
 990	if err := t.writeHeaderLocked(s); err != nil {
 991		switch e := err.(type) {
 992		case ConnectionError:
 993			return status.Error(codes.Unavailable, e.Desc)
 994		default:
 995			return status.Convert(err).Err()
 996		}
 997	}
 998	return nil
 999}
1000
1001func (t *http2Server) setResetPingStrikes() {
1002	atomic.StoreUint32(&t.resetPingStrikes, 1)
1003}
1004
1005func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
1006	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1007	// first and create a slice of that exact size.
1008	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
1009	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1010	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1011	if s.sendCompress != "" {
1012		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
1013	}
1014	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
1015	hf := &headerFrame{
1016		streamID:  s.id,
1017		hf:        headerFields,
1018		endStream: false,
1019		onWrite:   t.setResetPingStrikes,
1020	}
1021	success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
1022	if !success {
1023		if err != nil {
1024			return err
1025		}
1026		t.closeStream(s, true, http2.ErrCodeInternal, false)
1027		return ErrHeaderListSizeLimitViolation
1028	}
1029	for _, sh := range t.stats {
1030		// Note: Headers are compressed with hpack after this call returns.
1031		// No WireLength field is set here.
1032		outHeader := &stats.OutHeader{
1033			Header:      s.header.Copy(),
1034			Compression: s.sendCompress,
1035		}
1036		sh.HandleRPC(s.Context(), outHeader)
1037	}
1038	return nil
1039}
1040
1041// WriteStatus sends stream status to the client and terminates the stream.
1042// There is no further I/O operations being able to perform on this stream.
1043// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
1044// OK is adopted.
1045func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
1046	s.hdrMu.Lock()
1047	defer s.hdrMu.Unlock()
1048
1049	if s.getState() == streamDone {
1050		return nil
1051	}
1052
1053	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1054	// first and create a slice of that exact size.
1055	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
1056	if !s.updateHeaderSent() {                      // No headers have been sent.
1057		if len(s.header) > 0 { // Send a separate header frame.
1058			if err := t.writeHeaderLocked(s); err != nil {
1059				return err
1060			}
1061		} else { // Send a trailer only response.
1062			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1063			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1064		}
1065	}
1066	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1067	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1068
1069	if p := st.Proto(); p != nil && len(p.Details) > 0 {
1070		// Do not use the user's grpc-status-details-bin (if present) if we are
1071		// even attempting to set our own.
1072		delete(s.trailer, grpcStatusDetailsBinHeader)
1073		stBytes, err := proto.Marshal(p)
1074		if err != nil {
1075			// TODO: return error instead, when callers are able to handle it.
1076			t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
1077		} else {
1078			headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
1079		}
1080	}
1081
1082	// Attach the trailer metadata.
1083	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1084	trailingHeader := &headerFrame{
1085		streamID:  s.id,
1086		hf:        headerFields,
1087		endStream: true,
1088		onWrite:   t.setResetPingStrikes,
1089	}
1090
1091	success, err := t.controlBuf.executeAndPut(func() bool {
1092		return t.checkForHeaderListSize(trailingHeader)
1093	}, nil)
1094	if !success {
1095		if err != nil {
1096			return err
1097		}
1098		t.closeStream(s, true, http2.ErrCodeInternal, false)
1099		return ErrHeaderListSizeLimitViolation
1100	}
1101	// Send a RST_STREAM after the trailers if the client has not already half-closed.
1102	rst := s.getState() == streamActive
1103	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1104	for _, sh := range t.stats {
1105		// Note: The trailer fields are compressed with hpack after this call returns.
1106		// No WireLength field is set here.
1107		sh.HandleRPC(s.Context(), &stats.OutTrailer{
1108			Trailer: s.trailer.Copy(),
1109		})
1110	}
1111	return nil
1112}
1113
1114// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1115// is returns if it fails (e.g., framing error, transport error).
1116func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1117	reader := data.Reader()
1118
1119	if !s.isHeaderSent() { // Headers haven't been written yet.
1120		if err := t.writeHeader(s, nil); err != nil {
1121			_ = reader.Close()
1122			return err
1123		}
1124	} else {
1125		// Writing headers checks for this condition.
1126		if s.getState() == streamDone {
1127			_ = reader.Close()
1128			return t.streamContextErr(s)
1129		}
1130	}
1131
1132	df := &dataFrame{
1133		streamID:    s.id,
1134		h:           hdr,
1135		reader:      reader,
1136		onEachWrite: t.setResetPingStrikes,
1137	}
1138	if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1139		_ = reader.Close()
1140		return t.streamContextErr(s)
1141	}
1142	if err := t.controlBuf.put(df); err != nil {
1143		_ = reader.Close()
1144		return err
1145	}
1146	t.incrMsgSent()
1147	return nil
1148}
1149
1150// keepalive running in a separate goroutine does the following:
1151// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1152// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1153// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1154// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1155// after an additional duration of keepalive.Timeout.
1156func (t *http2Server) keepalive() {
1157	p := &ping{}
1158	// True iff a ping has been sent, and no data has been received since then.
1159	outstandingPing := false
1160	// Amount of time remaining before which we should receive an ACK for the
1161	// last sent ping.
1162	kpTimeoutLeft := time.Duration(0)
1163	// Records the last value of t.lastRead before we go block on the timer.
1164	// This is required to check for read activity since then.
1165	prevNano := time.Now().UnixNano()
1166	// Initialize the different timers to their default values.
1167	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1168	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1169	kpTimer := time.NewTimer(t.kp.Time)
1170	defer func() {
1171		// We need to drain the underlying channel in these timers after a call
1172		// to Stop(), only if we are interested in resetting them. Clearly we
1173		// are not interested in resetting them here.
1174		idleTimer.Stop()
1175		ageTimer.Stop()
1176		kpTimer.Stop()
1177	}()
1178
1179	for {
1180		select {
1181		case <-idleTimer.C:
1182			t.mu.Lock()
1183			idle := t.idle
1184			if idle.IsZero() { // The connection is non-idle.
1185				t.mu.Unlock()
1186				idleTimer.Reset(t.kp.MaxConnectionIdle)
1187				continue
1188			}
1189			val := t.kp.MaxConnectionIdle - time.Since(idle)
1190			t.mu.Unlock()
1191			if val <= 0 {
1192				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1193				// Gracefully close the connection.
1194				t.Drain("max_idle")
1195				return
1196			}
1197			idleTimer.Reset(val)
1198		case <-ageTimer.C:
1199			t.Drain("max_age")
1200			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1201			select {
1202			case <-ageTimer.C:
1203				// Close the connection after grace period.
1204				if t.logger.V(logLevel) {
1205					t.logger.Infof("Closing server transport due to maximum connection age")
1206				}
1207				t.controlBuf.put(closeConnection{})
1208			case <-t.done:
1209			}
1210			return
1211		case <-kpTimer.C:
1212			lastRead := atomic.LoadInt64(&t.lastRead)
1213			if lastRead > prevNano {
1214				// There has been read activity since the last time we were
1215				// here. Setup the timer to fire at kp.Time seconds from
1216				// lastRead time and continue.
1217				outstandingPing = false
1218				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1219				prevNano = lastRead
1220				continue
1221			}
1222			if outstandingPing && kpTimeoutLeft <= 0 {
1223				t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
1224				return
1225			}
1226			if !outstandingPing {
1227				if channelz.IsOn() {
1228					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1229				}
1230				t.controlBuf.put(p)
1231				kpTimeoutLeft = t.kp.Timeout
1232				outstandingPing = true
1233			}
1234			// The amount of time to sleep here is the minimum of kp.Time and
1235			// timeoutLeft. This will ensure that we wait only for kp.Time
1236			// before sending out the next ping (for cases where the ping is
1237			// acked).
1238			sleepDuration := min(t.kp.Time, kpTimeoutLeft)
1239			kpTimeoutLeft -= sleepDuration
1240			kpTimer.Reset(sleepDuration)
1241		case <-t.done:
1242			return
1243		}
1244	}
1245}
1246
1247// Close starts shutting down the http2Server transport.
1248// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1249// could cause some resource issue. Revisit this later.
1250func (t *http2Server) Close(err error) {
1251	t.mu.Lock()
1252	if t.state == closing {
1253		t.mu.Unlock()
1254		return
1255	}
1256	if t.logger.V(logLevel) {
1257		t.logger.Infof("Closing: %v", err)
1258	}
1259	t.state = closing
1260	streams := t.activeStreams
1261	t.activeStreams = nil
1262	t.mu.Unlock()
1263	t.controlBuf.finish()
1264	close(t.done)
1265	if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
1266		t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
1267	}
1268	channelz.RemoveEntry(t.channelz.ID)
1269	// Cancel all active streams.
1270	for _, s := range streams {
1271		s.cancel()
1272	}
1273}
1274
1275// deleteStream deletes the stream s from transport's active streams.
1276func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
1277
1278	t.mu.Lock()
1279	if _, ok := t.activeStreams[s.id]; ok {
1280		delete(t.activeStreams, s.id)
1281		if len(t.activeStreams) == 0 {
1282			t.idle = time.Now()
1283		}
1284	}
1285	t.mu.Unlock()
1286
1287	if channelz.IsOn() {
1288		if eosReceived {
1289			t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
1290		} else {
1291			t.channelz.SocketMetrics.StreamsFailed.Add(1)
1292		}
1293	}
1294}
1295
1296// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1297func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1298	// In case stream sending and receiving are invoked in separate
1299	// goroutines (e.g., bi-directional streaming), cancel needs to be
1300	// called to interrupt the potential blocking on other goroutines.
1301	s.cancel()
1302
1303	oldState := s.swapState(streamDone)
1304	if oldState == streamDone {
1305		// If the stream was already done, return.
1306		return
1307	}
1308
1309	hdr.cleanup = &cleanupStream{
1310		streamID: s.id,
1311		rst:      rst,
1312		rstCode:  rstCode,
1313		onWrite: func() {
1314			t.deleteStream(s, eosReceived)
1315		},
1316	}
1317	t.controlBuf.put(hdr)
1318}
1319
1320// closeStream clears the footprint of a stream when the stream is not needed any more.
1321func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1322	// In case stream sending and receiving are invoked in separate
1323	// goroutines (e.g., bi-directional streaming), cancel needs to be
1324	// called to interrupt the potential blocking on other goroutines.
1325	s.cancel()
1326
1327	s.swapState(streamDone)
1328	t.deleteStream(s, eosReceived)
1329
1330	t.controlBuf.put(&cleanupStream{
1331		streamID: s.id,
1332		rst:      rst,
1333		rstCode:  rstCode,
1334		onWrite:  func() {},
1335	})
1336}
1337
1338func (t *http2Server) Drain(debugData string) {
1339	t.mu.Lock()
1340	defer t.mu.Unlock()
1341	if t.drainEvent != nil {
1342		return
1343	}
1344	t.drainEvent = grpcsync.NewEvent()
1345	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
1346}
1347
1348var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1349
1350// Handles outgoing GoAway and returns true if loopy needs to put itself
1351// in draining mode.
1352func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1353	t.maxStreamMu.Lock()
1354	t.mu.Lock()
1355	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1356		t.mu.Unlock()
1357		t.maxStreamMu.Unlock()
1358		// The transport is closing.
1359		return false, ErrConnClosing
1360	}
1361	if !g.headsUp {
1362		// Stop accepting more streams now.
1363		t.state = draining
1364		sid := t.maxStreamID
1365		retErr := g.closeConn
1366		if len(t.activeStreams) == 0 {
1367			retErr = errors.New("second GOAWAY written and no active streams left to process")
1368		}
1369		t.mu.Unlock()
1370		t.maxStreamMu.Unlock()
1371		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1372			return false, err
1373		}
1374		t.framer.writer.Flush()
1375		if retErr != nil {
1376			return false, retErr
1377		}
1378		return true, nil
1379	}
1380	t.mu.Unlock()
1381	t.maxStreamMu.Unlock()
1382	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1383	// Follow that with a ping and wait for the ack to come back or a timer
1384	// to expire. During this time accept new streams since they might have
1385	// originated before the GoAway reaches the client.
1386	// After getting the ack or timer expiration send out another GoAway this
1387	// time with an ID of the max stream server intends to process.
1388	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
1389		return false, err
1390	}
1391	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1392		return false, err
1393	}
1394	go func() {
1395		timer := time.NewTimer(5 * time.Second)
1396		defer timer.Stop()
1397		select {
1398		case <-t.drainEvent.Done():
1399		case <-timer.C:
1400		case <-t.done:
1401			return
1402		}
1403		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1404	}()
1405	return false, nil
1406}
1407
1408func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
1409	return &channelz.EphemeralSocketMetrics{
1410		LocalFlowControlWindow:  int64(t.fc.getSize()),
1411		RemoteFlowControlWindow: t.getOutFlowWindow(),
1412	}
1413}
1414
1415func (t *http2Server) incrMsgSent() {
1416	if channelz.IsOn() {
1417		t.channelz.SocketMetrics.MessagesSent.Add(1)
1418		t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1419	}
1420}
1421
1422func (t *http2Server) incrMsgRecv() {
1423	if channelz.IsOn() {
1424		t.channelz.SocketMetrics.MessagesReceived.Add(1)
1425		t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1426	}
1427}
1428
1429func (t *http2Server) getOutFlowWindow() int64 {
1430	resp := make(chan uint32, 1)
1431	timer := time.NewTimer(time.Second)
1432	defer timer.Stop()
1433	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1434	select {
1435	case sz := <-resp:
1436		return int64(sz)
1437	case <-t.done:
1438		return -1
1439	case <-timer.C:
1440		return -2
1441	}
1442}
1443
1444// Peer returns the peer of the transport.
1445func (t *http2Server) Peer() *peer.Peer {
1446	return &peer.Peer{
1447		Addr:      t.peer.Addr,
1448		LocalAddr: t.peer.LocalAddr,
1449		AuthInfo:  t.peer.AuthInfo, // Can be nil
1450	}
1451}
1452
1453func getJitter(v time.Duration) time.Duration {
1454	if v == infinity {
1455		return 0
1456	}
1457	// Generate a jitter between +/- 10% of the value.
1458	r := int64(v / 10)
1459	j := rand.Int64N(2*r) - r
1460	return time.Duration(j)
1461}
1462
1463type connectionKey struct{}
1464
1465// GetConnection gets the connection from the context.
1466func GetConnection(ctx context.Context) net.Conn {
1467	conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1468	return conn
1469}
1470
1471// SetConnection adds the connection to the context to be able to get
1472// information about the destination ip and port for an incoming RPC. This also
1473// allows any unary or streaming interceptors to see the connection.
1474func SetConnection(ctx context.Context, conn net.Conn) context.Context {
1475	return context.WithValue(ctx, connectionKey{}, conn)
1476}