1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 rand "math/rand/v2"
29 "net"
30 "net/http"
31 "strconv"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 "google.golang.org/grpc/internal/grpclog"
39 "google.golang.org/grpc/internal/grpcutil"
40 "google.golang.org/grpc/internal/pretty"
41 "google.golang.org/grpc/internal/syscall"
42 "google.golang.org/grpc/mem"
43 "google.golang.org/protobuf/proto"
44
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials"
47 "google.golang.org/grpc/internal/channelz"
48 "google.golang.org/grpc/internal/grpcsync"
49 "google.golang.org/grpc/keepalive"
50 "google.golang.org/grpc/metadata"
51 "google.golang.org/grpc/peer"
52 "google.golang.org/grpc/stats"
53 "google.golang.org/grpc/status"
54 "google.golang.org/grpc/tap"
55)
56
57var (
58 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
59 // the stream's state.
60 ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
61 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
62 // than the limit set by peer.
63 ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
64)
65
66// serverConnectionCounter counts the number of connections a server has seen
67// (equal to the number of http2Servers created). Must be accessed atomically.
68var serverConnectionCounter uint64
69
70// http2Server implements the ServerTransport interface with HTTP2.
71type http2Server struct {
72 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
73 done chan struct{}
74 conn net.Conn
75 loopy *loopyWriter
76 readerDone chan struct{} // sync point to enable testing.
77 loopyWriterDone chan struct{}
78 peer peer.Peer
79 inTapHandle tap.ServerInHandle
80 framer *framer
81 // The max number of concurrent streams.
82 maxStreams uint32
83 // controlBuf delivers all the control related tasks (e.g., window
84 // updates, reset streams, and various settings) to the controller.
85 controlBuf *controlBuffer
86 fc *trInFlow
87 stats []stats.Handler
88 // Keepalive and max-age parameters for the server.
89 kp keepalive.ServerParameters
90 // Keepalive enforcement policy.
91 kep keepalive.EnforcementPolicy
92 // The time instance last ping was received.
93 lastPingAt time.Time
94 // Number of times the client has violated keepalive ping policy so far.
95 pingStrikes uint8
96 // Flag to signify that number of ping strikes should be reset to 0.
97 // This is set whenever data or header frames are sent.
98 // 1 means yes.
99 resetPingStrikes uint32 // Accessed atomically.
100 initialWindowSize int32
101 bdpEst *bdpEstimator
102 maxSendHeaderListSize *uint32
103
104 mu sync.Mutex // guard the following
105
106 // drainEvent is initialized when Drain() is called the first time. After
107 // which the server writes out the first GoAway(with ID 2^31-1) frame. Then
108 // an independent goroutine will be launched to later send the second
109 // GoAway. During this time we don't want to write another first GoAway(with
110 // ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
111 // already initialized since draining is already underway.
112 drainEvent *grpcsync.Event
113 state transportState
114 activeStreams map[uint32]*ServerStream
115 // idle is the time instant when the connection went idle.
116 // This is either the beginning of the connection or when the number of
117 // RPCs go down to 0.
118 // When the connection is busy, this value is set to 0.
119 idle time.Time
120
121 // Fields below are for channelz metric collection.
122 channelz *channelz.Socket
123 bufferPool mem.BufferPool
124
125 connectionID uint64
126
127 // maxStreamMu guards the maximum stream ID
128 // This lock may not be taken if mu is already held.
129 maxStreamMu sync.Mutex
130 maxStreamID uint32 // max stream ID ever seen
131
132 logger *grpclog.PrefixLogger
133}
134
135// NewServerTransport creates a http2 transport with conn and configuration
136// options from config.
137//
138// It returns a non-nil transport and a nil error on success. On failure, it
139// returns a nil transport and a non-nil error. For a special case where the
140// underlying conn gets closed before the client preface could be read, it
141// returns a nil transport and a nil error.
142func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
143 var authInfo credentials.AuthInfo
144 rawConn := conn
145 if config.Credentials != nil {
146 var err error
147 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
148 if err != nil {
149 // ErrConnDispatched means that the connection was dispatched away
150 // from gRPC; those connections should be left open. io.EOF means
151 // the connection was closed before handshaking completed, which can
152 // happen naturally from probers. Return these errors directly.
153 if err == credentials.ErrConnDispatched || err == io.EOF {
154 return nil, err
155 }
156 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
157 }
158 }
159 writeBufSize := config.WriteBufferSize
160 readBufSize := config.ReadBufferSize
161 maxHeaderListSize := defaultServerMaxHeaderListSize
162 if config.MaxHeaderListSize != nil {
163 maxHeaderListSize = *config.MaxHeaderListSize
164 }
165 framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
166 // Send initial settings as connection preface to client.
167 isettings := []http2.Setting{{
168 ID: http2.SettingMaxFrameSize,
169 Val: http2MaxFrameLen,
170 }}
171 if config.MaxStreams != math.MaxUint32 {
172 isettings = append(isettings, http2.Setting{
173 ID: http2.SettingMaxConcurrentStreams,
174 Val: config.MaxStreams,
175 })
176 }
177 dynamicWindow := true
178 iwz := int32(initialWindowSize)
179 if config.InitialWindowSize >= defaultWindowSize {
180 iwz = config.InitialWindowSize
181 dynamicWindow = false
182 }
183 icwz := int32(initialWindowSize)
184 if config.InitialConnWindowSize >= defaultWindowSize {
185 icwz = config.InitialConnWindowSize
186 dynamicWindow = false
187 }
188 if iwz != defaultWindowSize {
189 isettings = append(isettings, http2.Setting{
190 ID: http2.SettingInitialWindowSize,
191 Val: uint32(iwz)})
192 }
193 if config.MaxHeaderListSize != nil {
194 isettings = append(isettings, http2.Setting{
195 ID: http2.SettingMaxHeaderListSize,
196 Val: *config.MaxHeaderListSize,
197 })
198 }
199 if config.HeaderTableSize != nil {
200 isettings = append(isettings, http2.Setting{
201 ID: http2.SettingHeaderTableSize,
202 Val: *config.HeaderTableSize,
203 })
204 }
205 if err := framer.fr.WriteSettings(isettings...); err != nil {
206 return nil, connectionErrorf(false, err, "transport: %v", err)
207 }
208 // Adjust the connection flow control window if needed.
209 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
210 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
211 return nil, connectionErrorf(false, err, "transport: %v", err)
212 }
213 }
214 kp := config.KeepaliveParams
215 if kp.MaxConnectionIdle == 0 {
216 kp.MaxConnectionIdle = defaultMaxConnectionIdle
217 }
218 if kp.MaxConnectionAge == 0 {
219 kp.MaxConnectionAge = defaultMaxConnectionAge
220 }
221 // Add a jitter to MaxConnectionAge.
222 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
223 if kp.MaxConnectionAgeGrace == 0 {
224 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
225 }
226 if kp.Time == 0 {
227 kp.Time = defaultServerKeepaliveTime
228 }
229 if kp.Timeout == 0 {
230 kp.Timeout = defaultServerKeepaliveTimeout
231 }
232 if kp.Time != infinity {
233 if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
234 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
235 }
236 }
237 kep := config.KeepalivePolicy
238 if kep.MinTime == 0 {
239 kep.MinTime = defaultKeepalivePolicyMinTime
240 }
241
242 done := make(chan struct{})
243 peer := peer.Peer{
244 Addr: conn.RemoteAddr(),
245 LocalAddr: conn.LocalAddr(),
246 AuthInfo: authInfo,
247 }
248 t := &http2Server{
249 done: done,
250 conn: conn,
251 peer: peer,
252 framer: framer,
253 readerDone: make(chan struct{}),
254 loopyWriterDone: make(chan struct{}),
255 maxStreams: config.MaxStreams,
256 inTapHandle: config.InTapHandle,
257 fc: &trInFlow{limit: uint32(icwz)},
258 state: reachable,
259 activeStreams: make(map[uint32]*ServerStream),
260 stats: config.StatsHandlers,
261 kp: kp,
262 idle: time.Now(),
263 kep: kep,
264 initialWindowSize: iwz,
265 bufferPool: config.BufferPool,
266 }
267 var czSecurity credentials.ChannelzSecurityValue
268 if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
269 czSecurity = au.GetSecurityValue()
270 }
271 t.channelz = channelz.RegisterSocket(
272 &channelz.Socket{
273 SocketType: channelz.SocketTypeNormal,
274 Parent: config.ChannelzParent,
275 SocketMetrics: channelz.SocketMetrics{},
276 EphemeralMetrics: t.socketMetrics,
277 LocalAddr: t.peer.LocalAddr,
278 RemoteAddr: t.peer.Addr,
279 SocketOptions: channelz.GetSocketOption(t.conn),
280 Security: czSecurity,
281 },
282 )
283 t.logger = prefixLoggerForServerTransport(t)
284
285 t.controlBuf = newControlBuffer(t.done)
286 if dynamicWindow {
287 t.bdpEst = &bdpEstimator{
288 bdp: initialWindowSize,
289 updateFlowControl: t.updateFlowControl,
290 }
291 }
292
293 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
294 t.framer.writer.Flush()
295
296 defer func() {
297 if err != nil {
298 t.Close(err)
299 }
300 }()
301
302 // Check the validity of client preface.
303 preface := make([]byte, len(clientPreface))
304 if _, err := io.ReadFull(t.conn, preface); err != nil {
305 // In deployments where a gRPC server runs behind a cloud load balancer
306 // which performs regular TCP level health checks, the connection is
307 // closed immediately by the latter. Returning io.EOF here allows the
308 // grpc server implementation to recognize this scenario and suppress
309 // logging to reduce spam.
310 if err == io.EOF {
311 return nil, io.EOF
312 }
313 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
314 }
315 if !bytes.Equal(preface, clientPreface) {
316 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
317 }
318
319 frame, err := t.framer.fr.ReadFrame()
320 if err == io.EOF || err == io.ErrUnexpectedEOF {
321 return nil, err
322 }
323 if err != nil {
324 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
325 }
326 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
327 sf, ok := frame.(*http2.SettingsFrame)
328 if !ok {
329 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
330 }
331 t.handleSettings(sf)
332
333 go func() {
334 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
335 err := t.loopy.run()
336 close(t.loopyWriterDone)
337 if !isIOError(err) {
338 // Close the connection if a non-I/O error occurs (for I/O errors
339 // the reader will also encounter the error and close). Wait 1
340 // second before closing the connection, or when the reader is done
341 // (i.e. the client already closed the connection or a connection
342 // error occurred). This avoids the potential problem where there
343 // is unread data on the receive side of the connection, which, if
344 // closed, would lead to a TCP RST instead of FIN, and the client
345 // encountering errors. For more info:
346 // https://github.com/grpc/grpc-go/issues/5358
347 timer := time.NewTimer(time.Second)
348 defer timer.Stop()
349 select {
350 case <-t.readerDone:
351 case <-timer.C:
352 }
353 t.conn.Close()
354 }
355 }()
356 go t.keepalive()
357 return t, nil
358}
359
360// operateHeaders takes action on the decoded headers. Returns an error if fatal
361// error encountered and transport needs to close, otherwise returns nil.
362func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
363 // Acquire max stream ID lock for entire duration
364 t.maxStreamMu.Lock()
365 defer t.maxStreamMu.Unlock()
366
367 streamID := frame.Header().StreamID
368
369 // frame.Truncated is set to true when framer detects that the current header
370 // list size hits MaxHeaderListSize limit.
371 if frame.Truncated {
372 t.controlBuf.put(&cleanupStream{
373 streamID: streamID,
374 rst: true,
375 rstCode: http2.ErrCodeFrameSize,
376 onWrite: func() {},
377 })
378 return nil
379 }
380
381 if streamID%2 != 1 || streamID <= t.maxStreamID {
382 // illegal gRPC stream id.
383 return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
384 }
385 t.maxStreamID = streamID
386
387 buf := newRecvBuffer()
388 s := &ServerStream{
389 Stream: &Stream{
390 id: streamID,
391 buf: buf,
392 fc: &inFlow{limit: uint32(t.initialWindowSize)},
393 },
394 st: t,
395 headerWireLength: int(frame.Header().Length),
396 }
397 var (
398 // if false, content-type was missing or invalid
399 isGRPC = false
400 contentType = ""
401 mdata = make(metadata.MD, len(frame.Fields))
402 httpMethod string
403 // these are set if an error is encountered while parsing the headers
404 protocolError bool
405 headerError *status.Status
406
407 timeoutSet bool
408 timeout time.Duration
409 )
410
411 for _, hf := range frame.Fields {
412 switch hf.Name {
413 case "content-type":
414 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
415 if !validContentType {
416 contentType = hf.Value
417 break
418 }
419 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
420 s.contentSubtype = contentSubtype
421 isGRPC = true
422
423 case "grpc-accept-encoding":
424 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
425 if hf.Value == "" {
426 continue
427 }
428 compressors := hf.Value
429 if s.clientAdvertisedCompressors != "" {
430 compressors = s.clientAdvertisedCompressors + "," + compressors
431 }
432 s.clientAdvertisedCompressors = compressors
433 case "grpc-encoding":
434 s.recvCompress = hf.Value
435 case ":method":
436 httpMethod = hf.Value
437 case ":path":
438 s.method = hf.Value
439 case "grpc-timeout":
440 timeoutSet = true
441 var err error
442 if timeout, err = decodeTimeout(hf.Value); err != nil {
443 headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
444 }
445 // "Transports must consider requests containing the Connection header
446 // as malformed." - A41
447 case "connection":
448 if t.logger.V(logLevel) {
449 t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
450 }
451 protocolError = true
452 default:
453 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
454 break
455 }
456 v, err := decodeMetadataHeader(hf.Name, hf.Value)
457 if err != nil {
458 headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
459 t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
460 break
461 }
462 mdata[hf.Name] = append(mdata[hf.Name], v)
463 }
464 }
465
466 // "If multiple Host headers or multiple :authority headers are present, the
467 // request must be rejected with an HTTP status code 400 as required by Host
468 // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
469 // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
470 // error, this takes precedence over a client not speaking gRPC.
471 if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
472 errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
473 if t.logger.V(logLevel) {
474 t.logger.Infof("Aborting the stream early: %v", errMsg)
475 }
476 t.controlBuf.put(&earlyAbortStream{
477 httpStatus: http.StatusBadRequest,
478 streamID: streamID,
479 contentSubtype: s.contentSubtype,
480 status: status.New(codes.Internal, errMsg),
481 rst: !frame.StreamEnded(),
482 })
483 return nil
484 }
485
486 if protocolError {
487 t.controlBuf.put(&cleanupStream{
488 streamID: streamID,
489 rst: true,
490 rstCode: http2.ErrCodeProtocol,
491 onWrite: func() {},
492 })
493 return nil
494 }
495 if !isGRPC {
496 t.controlBuf.put(&earlyAbortStream{
497 httpStatus: http.StatusUnsupportedMediaType,
498 streamID: streamID,
499 contentSubtype: s.contentSubtype,
500 status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
501 rst: !frame.StreamEnded(),
502 })
503 return nil
504 }
505 if headerError != nil {
506 t.controlBuf.put(&earlyAbortStream{
507 httpStatus: http.StatusBadRequest,
508 streamID: streamID,
509 contentSubtype: s.contentSubtype,
510 status: headerError,
511 rst: !frame.StreamEnded(),
512 })
513 return nil
514 }
515
516 // "If :authority is missing, Host must be renamed to :authority." - A41
517 if len(mdata[":authority"]) == 0 {
518 // No-op if host isn't present, no eventual :authority header is a valid
519 // RPC.
520 if host, ok := mdata["host"]; ok {
521 mdata[":authority"] = host
522 delete(mdata, "host")
523 }
524 } else {
525 // "If :authority is present, Host must be discarded" - A41
526 delete(mdata, "host")
527 }
528
529 if frame.StreamEnded() {
530 // s is just created by the caller. No lock needed.
531 s.state = streamReadDone
532 }
533 if timeoutSet {
534 s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
535 } else {
536 s.ctx, s.cancel = context.WithCancel(ctx)
537 }
538
539 // Attach the received metadata to the context.
540 if len(mdata) > 0 {
541 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
542 }
543 t.mu.Lock()
544 if t.state != reachable {
545 t.mu.Unlock()
546 s.cancel()
547 return nil
548 }
549 if uint32(len(t.activeStreams)) >= t.maxStreams {
550 t.mu.Unlock()
551 t.controlBuf.put(&cleanupStream{
552 streamID: streamID,
553 rst: true,
554 rstCode: http2.ErrCodeRefusedStream,
555 onWrite: func() {},
556 })
557 s.cancel()
558 return nil
559 }
560 if httpMethod != http.MethodPost {
561 t.mu.Unlock()
562 errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
563 if t.logger.V(logLevel) {
564 t.logger.Infof("Aborting the stream early: %v", errMsg)
565 }
566 t.controlBuf.put(&earlyAbortStream{
567 httpStatus: http.StatusMethodNotAllowed,
568 streamID: streamID,
569 contentSubtype: s.contentSubtype,
570 status: status.New(codes.Internal, errMsg),
571 rst: !frame.StreamEnded(),
572 })
573 s.cancel()
574 return nil
575 }
576 if t.inTapHandle != nil {
577 var err error
578 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
579 t.mu.Unlock()
580 if t.logger.V(logLevel) {
581 t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
582 }
583 stat, ok := status.FromError(err)
584 if !ok {
585 stat = status.New(codes.PermissionDenied, err.Error())
586 }
587 t.controlBuf.put(&earlyAbortStream{
588 httpStatus: http.StatusOK,
589 streamID: s.id,
590 contentSubtype: s.contentSubtype,
591 status: stat,
592 rst: !frame.StreamEnded(),
593 })
594 return nil
595 }
596 }
597 t.activeStreams[streamID] = s
598 if len(t.activeStreams) == 1 {
599 t.idle = time.Time{}
600 }
601 t.mu.Unlock()
602 if channelz.IsOn() {
603 t.channelz.SocketMetrics.StreamsStarted.Add(1)
604 t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
605 }
606 s.requestRead = func(n int) {
607 t.adjustWindow(s, uint32(n))
608 }
609 s.ctxDone = s.ctx.Done()
610 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
611 s.trReader = &transportReader{
612 reader: &recvBufferReader{
613 ctx: s.ctx,
614 ctxDone: s.ctxDone,
615 recv: s.buf,
616 },
617 windowHandler: func(n int) {
618 t.updateWindow(s, uint32(n))
619 },
620 }
621 // Register the stream with loopy.
622 t.controlBuf.put(®isterStream{
623 streamID: s.id,
624 wq: s.wq,
625 })
626 handle(s)
627 return nil
628}
629
630// HandleStreams receives incoming streams using the given handler. This is
631// typically run in a separate goroutine.
632// traceCtx attaches trace to ctx and returns the new context.
633func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
634 defer func() {
635 close(t.readerDone)
636 <-t.loopyWriterDone
637 }()
638 for {
639 t.controlBuf.throttle()
640 frame, err := t.framer.fr.ReadFrame()
641 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
642 if err != nil {
643 if se, ok := err.(http2.StreamError); ok {
644 if t.logger.V(logLevel) {
645 t.logger.Warningf("Encountered http2.StreamError: %v", se)
646 }
647 t.mu.Lock()
648 s := t.activeStreams[se.StreamID]
649 t.mu.Unlock()
650 if s != nil {
651 t.closeStream(s, true, se.Code, false)
652 } else {
653 t.controlBuf.put(&cleanupStream{
654 streamID: se.StreamID,
655 rst: true,
656 rstCode: se.Code,
657 onWrite: func() {},
658 })
659 }
660 continue
661 }
662 t.Close(err)
663 return
664 }
665 switch frame := frame.(type) {
666 case *http2.MetaHeadersFrame:
667 if err := t.operateHeaders(ctx, frame, handle); err != nil {
668 // Any error processing client headers, e.g. invalid stream ID,
669 // is considered a protocol violation.
670 t.controlBuf.put(&goAway{
671 code: http2.ErrCodeProtocol,
672 debugData: []byte(err.Error()),
673 closeConn: err,
674 })
675 continue
676 }
677 case *http2.DataFrame:
678 t.handleData(frame)
679 case *http2.RSTStreamFrame:
680 t.handleRSTStream(frame)
681 case *http2.SettingsFrame:
682 t.handleSettings(frame)
683 case *http2.PingFrame:
684 t.handlePing(frame)
685 case *http2.WindowUpdateFrame:
686 t.handleWindowUpdate(frame)
687 case *http2.GoAwayFrame:
688 // TODO: Handle GoAway from the client appropriately.
689 default:
690 if t.logger.V(logLevel) {
691 t.logger.Infof("Received unsupported frame type %T", frame)
692 }
693 }
694 }
695}
696
697func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
698 t.mu.Lock()
699 defer t.mu.Unlock()
700 if t.activeStreams == nil {
701 // The transport is closing.
702 return nil, false
703 }
704 s, ok := t.activeStreams[f.Header().StreamID]
705 if !ok {
706 // The stream is already done.
707 return nil, false
708 }
709 return s, true
710}
711
712// adjustWindow sends out extra window update over the initial window size
713// of stream if the application is requesting data larger in size than
714// the window.
715func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
716 if w := s.fc.maybeAdjust(n); w > 0 {
717 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
718 }
719
720}
721
722// updateWindow adjusts the inbound quota for the stream and the transport.
723// Window updates will deliver to the controller for sending when
724// the cumulative quota exceeds the corresponding threshold.
725func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
726 if w := s.fc.onRead(n); w > 0 {
727 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
728 increment: w,
729 })
730 }
731}
732
733// updateFlowControl updates the incoming flow control windows
734// for the transport and the stream based on the current bdp
735// estimation.
736func (t *http2Server) updateFlowControl(n uint32) {
737 t.mu.Lock()
738 for _, s := range t.activeStreams {
739 s.fc.newLimit(n)
740 }
741 t.initialWindowSize = int32(n)
742 t.mu.Unlock()
743 t.controlBuf.put(&outgoingWindowUpdate{
744 streamID: 0,
745 increment: t.fc.newLimit(n),
746 })
747 t.controlBuf.put(&outgoingSettings{
748 ss: []http2.Setting{
749 {
750 ID: http2.SettingInitialWindowSize,
751 Val: n,
752 },
753 },
754 })
755
756}
757
758func (t *http2Server) handleData(f *http2.DataFrame) {
759 size := f.Header().Length
760 var sendBDPPing bool
761 if t.bdpEst != nil {
762 sendBDPPing = t.bdpEst.add(size)
763 }
764 // Decouple connection's flow control from application's read.
765 // An update on connection's flow control should not depend on
766 // whether user application has read the data or not. Such a
767 // restriction is already imposed on the stream's flow control,
768 // and therefore the sender will be blocked anyways.
769 // Decoupling the connection flow control will prevent other
770 // active(fast) streams from starving in presence of slow or
771 // inactive streams.
772 if w := t.fc.onData(size); w > 0 {
773 t.controlBuf.put(&outgoingWindowUpdate{
774 streamID: 0,
775 increment: w,
776 })
777 }
778 if sendBDPPing {
779 // Avoid excessive ping detection (e.g. in an L7 proxy)
780 // by sending a window update prior to the BDP ping.
781 if w := t.fc.reset(); w > 0 {
782 t.controlBuf.put(&outgoingWindowUpdate{
783 streamID: 0,
784 increment: w,
785 })
786 }
787 t.controlBuf.put(bdpPing)
788 }
789 // Select the right stream to dispatch.
790 s, ok := t.getStream(f)
791 if !ok {
792 return
793 }
794 if s.getState() == streamReadDone {
795 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
796 return
797 }
798 if size > 0 {
799 if err := s.fc.onData(size); err != nil {
800 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
801 return
802 }
803 if f.Header().Flags.Has(http2.FlagDataPadded) {
804 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
805 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
806 }
807 }
808 // TODO(bradfitz, zhaoq): A copy is required here because there is no
809 // guarantee f.Data() is consumed before the arrival of next frame.
810 // Can this copy be eliminated?
811 if len(f.Data()) > 0 {
812 pool := t.bufferPool
813 if pool == nil {
814 // Note that this is only supposed to be nil in tests. Otherwise, stream is
815 // always initialized with a BufferPool.
816 pool = mem.DefaultBufferPool()
817 }
818 s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
819 }
820 }
821 if f.StreamEnded() {
822 // Received the end of stream from the client.
823 s.compareAndSwapState(streamActive, streamReadDone)
824 s.write(recvMsg{err: io.EOF})
825 }
826}
827
828func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
829 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
830 if s, ok := t.getStream(f); ok {
831 t.closeStream(s, false, 0, false)
832 return
833 }
834 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
835 t.controlBuf.put(&cleanupStream{
836 streamID: f.Header().StreamID,
837 rst: false,
838 rstCode: 0,
839 onWrite: func() {},
840 })
841}
842
843func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
844 if f.IsAck() {
845 return
846 }
847 var ss []http2.Setting
848 var updateFuncs []func()
849 f.ForeachSetting(func(s http2.Setting) error {
850 switch s.ID {
851 case http2.SettingMaxHeaderListSize:
852 updateFuncs = append(updateFuncs, func() {
853 t.maxSendHeaderListSize = new(uint32)
854 *t.maxSendHeaderListSize = s.Val
855 })
856 default:
857 ss = append(ss, s)
858 }
859 return nil
860 })
861 t.controlBuf.executeAndPut(func() bool {
862 for _, f := range updateFuncs {
863 f()
864 }
865 return true
866 }, &incomingSettings{
867 ss: ss,
868 })
869}
870
871const (
872 maxPingStrikes = 2
873 defaultPingTimeout = 2 * time.Hour
874)
875
876func (t *http2Server) handlePing(f *http2.PingFrame) {
877 if f.IsAck() {
878 if f.Data == goAwayPing.data && t.drainEvent != nil {
879 t.drainEvent.Fire()
880 return
881 }
882 // Maybe it's a BDP ping.
883 if t.bdpEst != nil {
884 t.bdpEst.calculate(f.Data)
885 }
886 return
887 }
888 pingAck := &ping{ack: true}
889 copy(pingAck.data[:], f.Data[:])
890 t.controlBuf.put(pingAck)
891
892 now := time.Now()
893 defer func() {
894 t.lastPingAt = now
895 }()
896 // A reset ping strikes means that we don't need to check for policy
897 // violation for this ping and the pingStrikes counter should be set
898 // to 0.
899 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
900 t.pingStrikes = 0
901 return
902 }
903 t.mu.Lock()
904 ns := len(t.activeStreams)
905 t.mu.Unlock()
906 if ns < 1 && !t.kep.PermitWithoutStream {
907 // Keepalive shouldn't be active thus, this new ping should
908 // have come after at least defaultPingTimeout.
909 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
910 t.pingStrikes++
911 }
912 } else {
913 // Check if keepalive policy is respected.
914 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
915 t.pingStrikes++
916 }
917 }
918
919 if t.pingStrikes > maxPingStrikes {
920 // Send goaway and close the connection.
921 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
922 }
923}
924
925func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
926 t.controlBuf.put(&incomingWindowUpdate{
927 streamID: f.Header().StreamID,
928 increment: f.Increment,
929 })
930}
931
932func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
933 for k, vv := range md {
934 if isReservedHeader(k) {
935 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
936 continue
937 }
938 for _, v := range vv {
939 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
940 }
941 }
942 return headerFields
943}
944
945func (t *http2Server) checkForHeaderListSize(it any) bool {
946 if t.maxSendHeaderListSize == nil {
947 return true
948 }
949 hdrFrame := it.(*headerFrame)
950 var sz int64
951 for _, f := range hdrFrame.hf {
952 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
953 if t.logger.V(logLevel) {
954 t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
955 }
956 return false
957 }
958 }
959 return true
960}
961
962func (t *http2Server) streamContextErr(s *ServerStream) error {
963 select {
964 case <-t.done:
965 return ErrConnClosing
966 default:
967 }
968 return ContextErr(s.ctx.Err())
969}
970
971// WriteHeader sends the header metadata md back to the client.
972func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
973 s.hdrMu.Lock()
974 defer s.hdrMu.Unlock()
975 if s.getState() == streamDone {
976 return t.streamContextErr(s)
977 }
978
979 if s.updateHeaderSent() {
980 return ErrIllegalHeaderWrite
981 }
982
983 if md.Len() > 0 {
984 if s.header.Len() > 0 {
985 s.header = metadata.Join(s.header, md)
986 } else {
987 s.header = md
988 }
989 }
990 if err := t.writeHeaderLocked(s); err != nil {
991 switch e := err.(type) {
992 case ConnectionError:
993 return status.Error(codes.Unavailable, e.Desc)
994 default:
995 return status.Convert(err).Err()
996 }
997 }
998 return nil
999}
1000
1001func (t *http2Server) setResetPingStrikes() {
1002 atomic.StoreUint32(&t.resetPingStrikes, 1)
1003}
1004
1005func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
1006 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1007 // first and create a slice of that exact size.
1008 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
1009 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1010 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1011 if s.sendCompress != "" {
1012 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
1013 }
1014 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
1015 hf := &headerFrame{
1016 streamID: s.id,
1017 hf: headerFields,
1018 endStream: false,
1019 onWrite: t.setResetPingStrikes,
1020 }
1021 success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
1022 if !success {
1023 if err != nil {
1024 return err
1025 }
1026 t.closeStream(s, true, http2.ErrCodeInternal, false)
1027 return ErrHeaderListSizeLimitViolation
1028 }
1029 for _, sh := range t.stats {
1030 // Note: Headers are compressed with hpack after this call returns.
1031 // No WireLength field is set here.
1032 outHeader := &stats.OutHeader{
1033 Header: s.header.Copy(),
1034 Compression: s.sendCompress,
1035 }
1036 sh.HandleRPC(s.Context(), outHeader)
1037 }
1038 return nil
1039}
1040
1041// WriteStatus sends stream status to the client and terminates the stream.
1042// There is no further I/O operations being able to perform on this stream.
1043// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
1044// OK is adopted.
1045func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
1046 s.hdrMu.Lock()
1047 defer s.hdrMu.Unlock()
1048
1049 if s.getState() == streamDone {
1050 return nil
1051 }
1052
1053 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
1054 // first and create a slice of that exact size.
1055 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
1056 if !s.updateHeaderSent() { // No headers have been sent.
1057 if len(s.header) > 0 { // Send a separate header frame.
1058 if err := t.writeHeaderLocked(s); err != nil {
1059 return err
1060 }
1061 } else { // Send a trailer only response.
1062 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1063 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1064 }
1065 }
1066 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1067 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1068
1069 if p := st.Proto(); p != nil && len(p.Details) > 0 {
1070 // Do not use the user's grpc-status-details-bin (if present) if we are
1071 // even attempting to set our own.
1072 delete(s.trailer, grpcStatusDetailsBinHeader)
1073 stBytes, err := proto.Marshal(p)
1074 if err != nil {
1075 // TODO: return error instead, when callers are able to handle it.
1076 t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
1077 } else {
1078 headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
1079 }
1080 }
1081
1082 // Attach the trailer metadata.
1083 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1084 trailingHeader := &headerFrame{
1085 streamID: s.id,
1086 hf: headerFields,
1087 endStream: true,
1088 onWrite: t.setResetPingStrikes,
1089 }
1090
1091 success, err := t.controlBuf.executeAndPut(func() bool {
1092 return t.checkForHeaderListSize(trailingHeader)
1093 }, nil)
1094 if !success {
1095 if err != nil {
1096 return err
1097 }
1098 t.closeStream(s, true, http2.ErrCodeInternal, false)
1099 return ErrHeaderListSizeLimitViolation
1100 }
1101 // Send a RST_STREAM after the trailers if the client has not already half-closed.
1102 rst := s.getState() == streamActive
1103 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1104 for _, sh := range t.stats {
1105 // Note: The trailer fields are compressed with hpack after this call returns.
1106 // No WireLength field is set here.
1107 sh.HandleRPC(s.Context(), &stats.OutTrailer{
1108 Trailer: s.trailer.Copy(),
1109 })
1110 }
1111 return nil
1112}
1113
1114// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1115// is returns if it fails (e.g., framing error, transport error).
1116func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1117 reader := data.Reader()
1118
1119 if !s.isHeaderSent() { // Headers haven't been written yet.
1120 if err := t.writeHeader(s, nil); err != nil {
1121 _ = reader.Close()
1122 return err
1123 }
1124 } else {
1125 // Writing headers checks for this condition.
1126 if s.getState() == streamDone {
1127 _ = reader.Close()
1128 return t.streamContextErr(s)
1129 }
1130 }
1131
1132 df := &dataFrame{
1133 streamID: s.id,
1134 h: hdr,
1135 reader: reader,
1136 onEachWrite: t.setResetPingStrikes,
1137 }
1138 if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1139 _ = reader.Close()
1140 return t.streamContextErr(s)
1141 }
1142 if err := t.controlBuf.put(df); err != nil {
1143 _ = reader.Close()
1144 return err
1145 }
1146 t.incrMsgSent()
1147 return nil
1148}
1149
1150// keepalive running in a separate goroutine does the following:
1151// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1152// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1153// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1154// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1155// after an additional duration of keepalive.Timeout.
1156func (t *http2Server) keepalive() {
1157 p := &ping{}
1158 // True iff a ping has been sent, and no data has been received since then.
1159 outstandingPing := false
1160 // Amount of time remaining before which we should receive an ACK for the
1161 // last sent ping.
1162 kpTimeoutLeft := time.Duration(0)
1163 // Records the last value of t.lastRead before we go block on the timer.
1164 // This is required to check for read activity since then.
1165 prevNano := time.Now().UnixNano()
1166 // Initialize the different timers to their default values.
1167 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1168 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1169 kpTimer := time.NewTimer(t.kp.Time)
1170 defer func() {
1171 // We need to drain the underlying channel in these timers after a call
1172 // to Stop(), only if we are interested in resetting them. Clearly we
1173 // are not interested in resetting them here.
1174 idleTimer.Stop()
1175 ageTimer.Stop()
1176 kpTimer.Stop()
1177 }()
1178
1179 for {
1180 select {
1181 case <-idleTimer.C:
1182 t.mu.Lock()
1183 idle := t.idle
1184 if idle.IsZero() { // The connection is non-idle.
1185 t.mu.Unlock()
1186 idleTimer.Reset(t.kp.MaxConnectionIdle)
1187 continue
1188 }
1189 val := t.kp.MaxConnectionIdle - time.Since(idle)
1190 t.mu.Unlock()
1191 if val <= 0 {
1192 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1193 // Gracefully close the connection.
1194 t.Drain("max_idle")
1195 return
1196 }
1197 idleTimer.Reset(val)
1198 case <-ageTimer.C:
1199 t.Drain("max_age")
1200 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1201 select {
1202 case <-ageTimer.C:
1203 // Close the connection after grace period.
1204 if t.logger.V(logLevel) {
1205 t.logger.Infof("Closing server transport due to maximum connection age")
1206 }
1207 t.controlBuf.put(closeConnection{})
1208 case <-t.done:
1209 }
1210 return
1211 case <-kpTimer.C:
1212 lastRead := atomic.LoadInt64(&t.lastRead)
1213 if lastRead > prevNano {
1214 // There has been read activity since the last time we were
1215 // here. Setup the timer to fire at kp.Time seconds from
1216 // lastRead time and continue.
1217 outstandingPing = false
1218 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1219 prevNano = lastRead
1220 continue
1221 }
1222 if outstandingPing && kpTimeoutLeft <= 0 {
1223 t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
1224 return
1225 }
1226 if !outstandingPing {
1227 if channelz.IsOn() {
1228 t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1229 }
1230 t.controlBuf.put(p)
1231 kpTimeoutLeft = t.kp.Timeout
1232 outstandingPing = true
1233 }
1234 // The amount of time to sleep here is the minimum of kp.Time and
1235 // timeoutLeft. This will ensure that we wait only for kp.Time
1236 // before sending out the next ping (for cases where the ping is
1237 // acked).
1238 sleepDuration := min(t.kp.Time, kpTimeoutLeft)
1239 kpTimeoutLeft -= sleepDuration
1240 kpTimer.Reset(sleepDuration)
1241 case <-t.done:
1242 return
1243 }
1244 }
1245}
1246
1247// Close starts shutting down the http2Server transport.
1248// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1249// could cause some resource issue. Revisit this later.
1250func (t *http2Server) Close(err error) {
1251 t.mu.Lock()
1252 if t.state == closing {
1253 t.mu.Unlock()
1254 return
1255 }
1256 if t.logger.V(logLevel) {
1257 t.logger.Infof("Closing: %v", err)
1258 }
1259 t.state = closing
1260 streams := t.activeStreams
1261 t.activeStreams = nil
1262 t.mu.Unlock()
1263 t.controlBuf.finish()
1264 close(t.done)
1265 if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
1266 t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
1267 }
1268 channelz.RemoveEntry(t.channelz.ID)
1269 // Cancel all active streams.
1270 for _, s := range streams {
1271 s.cancel()
1272 }
1273}
1274
1275// deleteStream deletes the stream s from transport's active streams.
1276func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
1277
1278 t.mu.Lock()
1279 if _, ok := t.activeStreams[s.id]; ok {
1280 delete(t.activeStreams, s.id)
1281 if len(t.activeStreams) == 0 {
1282 t.idle = time.Now()
1283 }
1284 }
1285 t.mu.Unlock()
1286
1287 if channelz.IsOn() {
1288 if eosReceived {
1289 t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
1290 } else {
1291 t.channelz.SocketMetrics.StreamsFailed.Add(1)
1292 }
1293 }
1294}
1295
1296// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1297func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1298 // In case stream sending and receiving are invoked in separate
1299 // goroutines (e.g., bi-directional streaming), cancel needs to be
1300 // called to interrupt the potential blocking on other goroutines.
1301 s.cancel()
1302
1303 oldState := s.swapState(streamDone)
1304 if oldState == streamDone {
1305 // If the stream was already done, return.
1306 return
1307 }
1308
1309 hdr.cleanup = &cleanupStream{
1310 streamID: s.id,
1311 rst: rst,
1312 rstCode: rstCode,
1313 onWrite: func() {
1314 t.deleteStream(s, eosReceived)
1315 },
1316 }
1317 t.controlBuf.put(hdr)
1318}
1319
1320// closeStream clears the footprint of a stream when the stream is not needed any more.
1321func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1322 // In case stream sending and receiving are invoked in separate
1323 // goroutines (e.g., bi-directional streaming), cancel needs to be
1324 // called to interrupt the potential blocking on other goroutines.
1325 s.cancel()
1326
1327 s.swapState(streamDone)
1328 t.deleteStream(s, eosReceived)
1329
1330 t.controlBuf.put(&cleanupStream{
1331 streamID: s.id,
1332 rst: rst,
1333 rstCode: rstCode,
1334 onWrite: func() {},
1335 })
1336}
1337
1338func (t *http2Server) Drain(debugData string) {
1339 t.mu.Lock()
1340 defer t.mu.Unlock()
1341 if t.drainEvent != nil {
1342 return
1343 }
1344 t.drainEvent = grpcsync.NewEvent()
1345 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
1346}
1347
1348var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1349
1350// Handles outgoing GoAway and returns true if loopy needs to put itself
1351// in draining mode.
1352func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1353 t.maxStreamMu.Lock()
1354 t.mu.Lock()
1355 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1356 t.mu.Unlock()
1357 t.maxStreamMu.Unlock()
1358 // The transport is closing.
1359 return false, ErrConnClosing
1360 }
1361 if !g.headsUp {
1362 // Stop accepting more streams now.
1363 t.state = draining
1364 sid := t.maxStreamID
1365 retErr := g.closeConn
1366 if len(t.activeStreams) == 0 {
1367 retErr = errors.New("second GOAWAY written and no active streams left to process")
1368 }
1369 t.mu.Unlock()
1370 t.maxStreamMu.Unlock()
1371 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1372 return false, err
1373 }
1374 t.framer.writer.Flush()
1375 if retErr != nil {
1376 return false, retErr
1377 }
1378 return true, nil
1379 }
1380 t.mu.Unlock()
1381 t.maxStreamMu.Unlock()
1382 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1383 // Follow that with a ping and wait for the ack to come back or a timer
1384 // to expire. During this time accept new streams since they might have
1385 // originated before the GoAway reaches the client.
1386 // After getting the ack or timer expiration send out another GoAway this
1387 // time with an ID of the max stream server intends to process.
1388 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
1389 return false, err
1390 }
1391 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1392 return false, err
1393 }
1394 go func() {
1395 timer := time.NewTimer(5 * time.Second)
1396 defer timer.Stop()
1397 select {
1398 case <-t.drainEvent.Done():
1399 case <-timer.C:
1400 case <-t.done:
1401 return
1402 }
1403 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1404 }()
1405 return false, nil
1406}
1407
1408func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
1409 return &channelz.EphemeralSocketMetrics{
1410 LocalFlowControlWindow: int64(t.fc.getSize()),
1411 RemoteFlowControlWindow: t.getOutFlowWindow(),
1412 }
1413}
1414
1415func (t *http2Server) incrMsgSent() {
1416 if channelz.IsOn() {
1417 t.channelz.SocketMetrics.MessagesSent.Add(1)
1418 t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1419 }
1420}
1421
1422func (t *http2Server) incrMsgRecv() {
1423 if channelz.IsOn() {
1424 t.channelz.SocketMetrics.MessagesReceived.Add(1)
1425 t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1426 }
1427}
1428
1429func (t *http2Server) getOutFlowWindow() int64 {
1430 resp := make(chan uint32, 1)
1431 timer := time.NewTimer(time.Second)
1432 defer timer.Stop()
1433 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1434 select {
1435 case sz := <-resp:
1436 return int64(sz)
1437 case <-t.done:
1438 return -1
1439 case <-timer.C:
1440 return -2
1441 }
1442}
1443
1444// Peer returns the peer of the transport.
1445func (t *http2Server) Peer() *peer.Peer {
1446 return &peer.Peer{
1447 Addr: t.peer.Addr,
1448 LocalAddr: t.peer.LocalAddr,
1449 AuthInfo: t.peer.AuthInfo, // Can be nil
1450 }
1451}
1452
1453func getJitter(v time.Duration) time.Duration {
1454 if v == infinity {
1455 return 0
1456 }
1457 // Generate a jitter between +/- 10% of the value.
1458 r := int64(v / 10)
1459 j := rand.Int64N(2*r) - r
1460 return time.Duration(j)
1461}
1462
1463type connectionKey struct{}
1464
1465// GetConnection gets the connection from the context.
1466func GetConnection(ctx context.Context) net.Conn {
1467 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1468 return conn
1469}
1470
1471// SetConnection adds the connection to the context to be able to get
1472// information about the destination ip and port for an incoming RPC. This also
1473// allows any unary or streaming interceptors to see the connection.
1474func SetConnection(ctx context.Context, conn net.Conn) context.Context {
1475 return context.WithValue(ctx, connectionKey{}, conn)
1476}