1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Transport code.
6
7package http2
8
9import (
10 "bufio"
11 "bytes"
12 "compress/gzip"
13 "context"
14 "crypto/rand"
15 "crypto/tls"
16 "errors"
17 "fmt"
18 "io"
19 "io/fs"
20 "log"
21 "math"
22 "math/bits"
23 mathrand "math/rand"
24 "net"
25 "net/http"
26 "net/http/httptrace"
27 "net/textproto"
28 "strconv"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "golang.org/x/net/http/httpguts"
35 "golang.org/x/net/http2/hpack"
36 "golang.org/x/net/idna"
37 "golang.org/x/net/internal/httpcommon"
38)
39
40const (
41 // transportDefaultConnFlow is how many connection-level flow control
42 // tokens we give the server at start-up, past the default 64k.
43 transportDefaultConnFlow = 1 << 30
44
45 // transportDefaultStreamFlow is how many stream-level flow
46 // control tokens we announce to the peer, and how many bytes
47 // we buffer per stream.
48 transportDefaultStreamFlow = 4 << 20
49
50 defaultUserAgent = "Go-http-client/2.0"
51
52 // initialMaxConcurrentStreams is a connections maxConcurrentStreams until
53 // it's received servers initial SETTINGS frame, which corresponds with the
54 // spec's minimum recommended value.
55 initialMaxConcurrentStreams = 100
56
57 // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
58 // if the server doesn't include one in its initial SETTINGS frame.
59 defaultMaxConcurrentStreams = 1000
60)
61
62// Transport is an HTTP/2 Transport.
63//
64// A Transport internally caches connections to servers. It is safe
65// for concurrent use by multiple goroutines.
66type Transport struct {
67 // DialTLSContext specifies an optional dial function with context for
68 // creating TLS connections for requests.
69 //
70 // If DialTLSContext and DialTLS is nil, tls.Dial is used.
71 //
72 // If the returned net.Conn has a ConnectionState method like tls.Conn,
73 // it will be used to set http.Response.TLS.
74 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
75
76 // DialTLS specifies an optional dial function for creating
77 // TLS connections for requests.
78 //
79 // If DialTLSContext and DialTLS is nil, tls.Dial is used.
80 //
81 // Deprecated: Use DialTLSContext instead, which allows the transport
82 // to cancel dials as soon as they are no longer needed.
83 // If both are set, DialTLSContext takes priority.
84 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
85
86 // TLSClientConfig specifies the TLS configuration to use with
87 // tls.Client. If nil, the default configuration is used.
88 TLSClientConfig *tls.Config
89
90 // ConnPool optionally specifies an alternate connection pool to use.
91 // If nil, the default is used.
92 ConnPool ClientConnPool
93
94 // DisableCompression, if true, prevents the Transport from
95 // requesting compression with an "Accept-Encoding: gzip"
96 // request header when the Request contains no existing
97 // Accept-Encoding value. If the Transport requests gzip on
98 // its own and gets a gzipped response, it's transparently
99 // decoded in the Response.Body. However, if the user
100 // explicitly requested gzip it is not automatically
101 // uncompressed.
102 DisableCompression bool
103
104 // AllowHTTP, if true, permits HTTP/2 requests using the insecure,
105 // plain-text "http" scheme. Note that this does not enable h2c support.
106 AllowHTTP bool
107
108 // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
109 // send in the initial settings frame. It is how many bytes
110 // of response headers are allowed. Unlike the http2 spec, zero here
111 // means to use a default limit (currently 10MB). If you actually
112 // want to advertise an unlimited value to the peer, Transport
113 // interprets the highest possible value here (0xffffffff or 1<<32-1)
114 // to mean no limit.
115 MaxHeaderListSize uint32
116
117 // MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
118 // initial settings frame. It is the size in bytes of the largest frame
119 // payload that the sender is willing to receive. If 0, no setting is
120 // sent, and the value is provided by the peer, which should be 16384
121 // according to the spec:
122 // https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
123 // Values are bounded in the range 16k to 16M.
124 MaxReadFrameSize uint32
125
126 // MaxDecoderHeaderTableSize optionally specifies the http2
127 // SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
128 // informs the remote endpoint of the maximum size of the header compression
129 // table used to decode header blocks, in octets. If zero, the default value
130 // of 4096 is used.
131 MaxDecoderHeaderTableSize uint32
132
133 // MaxEncoderHeaderTableSize optionally specifies an upper limit for the
134 // header compression table used for encoding request headers. Received
135 // SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
136 // the default value of 4096 is used.
137 MaxEncoderHeaderTableSize uint32
138
139 // StrictMaxConcurrentStreams controls whether the server's
140 // SETTINGS_MAX_CONCURRENT_STREAMS should be respected
141 // globally. If false, new TCP connections are created to the
142 // server as needed to keep each under the per-connection
143 // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
144 // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
145 // a global limit and callers of RoundTrip block when needed,
146 // waiting for their turn.
147 StrictMaxConcurrentStreams bool
148
149 // IdleConnTimeout is the maximum amount of time an idle
150 // (keep-alive) connection will remain idle before closing
151 // itself.
152 // Zero means no limit.
153 IdleConnTimeout time.Duration
154
155 // ReadIdleTimeout is the timeout after which a health check using ping
156 // frame will be carried out if no frame is received on the connection.
157 // Note that a ping response will is considered a received frame, so if
158 // there is no other traffic on the connection, the health check will
159 // be performed every ReadIdleTimeout interval.
160 // If zero, no health check is performed.
161 ReadIdleTimeout time.Duration
162
163 // PingTimeout is the timeout after which the connection will be closed
164 // if a response to Ping is not received.
165 // Defaults to 15s.
166 PingTimeout time.Duration
167
168 // WriteByteTimeout is the timeout after which the connection will be
169 // closed no data can be written to it. The timeout begins when data is
170 // available to write, and is extended whenever any bytes are written.
171 WriteByteTimeout time.Duration
172
173 // CountError, if non-nil, is called on HTTP/2 transport errors.
174 // It's intended to increment a metric for monitoring, such
175 // as an expvar or Prometheus metric.
176 // The errType consists of only ASCII word characters.
177 CountError func(errType string)
178
179 // t1, if non-nil, is the standard library Transport using
180 // this transport. Its settings are used (but not its
181 // RoundTrip method, etc).
182 t1 *http.Transport
183
184 connPoolOnce sync.Once
185 connPoolOrDef ClientConnPool // non-nil version of ConnPool
186
187 *transportTestHooks
188}
189
190// Hook points used for testing.
191// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
192// Inside tests, see the testSyncHooks function docs.
193
194type transportTestHooks struct {
195 newclientconn func(*ClientConn)
196 group synctestGroupInterface
197}
198
199func (t *Transport) markNewGoroutine() {
200 if t != nil && t.transportTestHooks != nil {
201 t.transportTestHooks.group.Join()
202 }
203}
204
205func (t *Transport) now() time.Time {
206 if t != nil && t.transportTestHooks != nil {
207 return t.transportTestHooks.group.Now()
208 }
209 return time.Now()
210}
211
212func (t *Transport) timeSince(when time.Time) time.Duration {
213 if t != nil && t.transportTestHooks != nil {
214 return t.now().Sub(when)
215 }
216 return time.Since(when)
217}
218
219// newTimer creates a new time.Timer, or a synthetic timer in tests.
220func (t *Transport) newTimer(d time.Duration) timer {
221 if t.transportTestHooks != nil {
222 return t.transportTestHooks.group.NewTimer(d)
223 }
224 return timeTimer{time.NewTimer(d)}
225}
226
227// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
228func (t *Transport) afterFunc(d time.Duration, f func()) timer {
229 if t.transportTestHooks != nil {
230 return t.transportTestHooks.group.AfterFunc(d, f)
231 }
232 return timeTimer{time.AfterFunc(d, f)}
233}
234
235func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
236 if t.transportTestHooks != nil {
237 return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
238 }
239 return context.WithTimeout(ctx, d)
240}
241
242func (t *Transport) maxHeaderListSize() uint32 {
243 n := int64(t.MaxHeaderListSize)
244 if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
245 n = t.t1.MaxResponseHeaderBytes
246 if n > 0 {
247 n = adjustHTTP1MaxHeaderSize(n)
248 }
249 }
250 if n <= 0 {
251 return 10 << 20
252 }
253 if n >= 0xffffffff {
254 return 0
255 }
256 return uint32(n)
257}
258
259func (t *Transport) disableCompression() bool {
260 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
261}
262
263// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
264// It returns an error if t1 has already been HTTP/2-enabled.
265//
266// Use ConfigureTransports instead to configure the HTTP/2 Transport.
267func ConfigureTransport(t1 *http.Transport) error {
268 _, err := ConfigureTransports(t1)
269 return err
270}
271
272// ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
273// It returns a new HTTP/2 Transport for further configuration.
274// It returns an error if t1 has already been HTTP/2-enabled.
275func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
276 return configureTransports(t1)
277}
278
279func configureTransports(t1 *http.Transport) (*Transport, error) {
280 connPool := new(clientConnPool)
281 t2 := &Transport{
282 ConnPool: noDialClientConnPool{connPool},
283 t1: t1,
284 }
285 connPool.t = t2
286 if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
287 return nil, err
288 }
289 if t1.TLSClientConfig == nil {
290 t1.TLSClientConfig = new(tls.Config)
291 }
292 if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
293 t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
294 }
295 if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
296 t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
297 }
298 upgradeFn := func(scheme, authority string, c net.Conn) http.RoundTripper {
299 addr := authorityAddr(scheme, authority)
300 if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
301 go c.Close()
302 return erringRoundTripper{err}
303 } else if !used {
304 // Turns out we don't need this c.
305 // For example, two goroutines made requests to the same host
306 // at the same time, both kicking off TCP dials. (since protocol
307 // was unknown)
308 go c.Close()
309 }
310 if scheme == "http" {
311 return (*unencryptedTransport)(t2)
312 }
313 return t2
314 }
315 if t1.TLSNextProto == nil {
316 t1.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
317 }
318 t1.TLSNextProto[NextProtoTLS] = func(authority string, c *tls.Conn) http.RoundTripper {
319 return upgradeFn("https", authority, c)
320 }
321 // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
322 t1.TLSNextProto[nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) http.RoundTripper {
323 nc, err := unencryptedNetConnFromTLSConn(c)
324 if err != nil {
325 go c.Close()
326 return erringRoundTripper{err}
327 }
328 return upgradeFn("http", authority, nc)
329 }
330 return t2, nil
331}
332
333// unencryptedTransport is a Transport with a RoundTrip method that
334// always permits http:// URLs.
335type unencryptedTransport Transport
336
337func (t *unencryptedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
338 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
339}
340
341func (t *Transport) connPool() ClientConnPool {
342 t.connPoolOnce.Do(t.initConnPool)
343 return t.connPoolOrDef
344}
345
346func (t *Transport) initConnPool() {
347 if t.ConnPool != nil {
348 t.connPoolOrDef = t.ConnPool
349 } else {
350 t.connPoolOrDef = &clientConnPool{t: t}
351 }
352}
353
354// ClientConn is the state of a single HTTP/2 client connection to an
355// HTTP/2 server.
356type ClientConn struct {
357 t *Transport
358 tconn net.Conn // usually *tls.Conn, except specialized impls
359 tlsState *tls.ConnectionState // nil only for specialized impls
360 atomicReused uint32 // whether conn is being reused; atomic
361 singleUse bool // whether being used for a single http.Request
362 getConnCalled bool // used by clientConnPool
363
364 // readLoop goroutine fields:
365 readerDone chan struct{} // closed on error
366 readerErr error // set before readerDone is closed
367
368 idleTimeout time.Duration // or 0 for never
369 idleTimer timer
370
371 mu sync.Mutex // guards following
372 cond *sync.Cond // hold mu; broadcast on flow/closed changes
373 flow outflow // our conn-level flow control quota (cs.outflow is per stream)
374 inflow inflow // peer's conn-level flow control
375 doNotReuse bool // whether conn is marked to not be reused for any future requests
376 closing bool
377 closed bool
378 closedOnIdle bool // true if conn was closed for idleness
379 seenSettings bool // true if we've seen a settings frame, false otherwise
380 seenSettingsChan chan struct{} // closed when seenSettings is true or frame reading fails
381 wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
382 goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
383 goAwayDebug string // goAway frame's debug data, retained as a string
384 streams map[uint32]*clientStream // client-initiated
385 streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
386 nextStreamID uint32
387 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
388 pings map[[8]byte]chan struct{} // in flight ping data to notification channel
389 br *bufio.Reader
390 lastActive time.Time
391 lastIdle time.Time // time last idle
392 // Settings from peer: (also guarded by wmu)
393 maxFrameSize uint32
394 maxConcurrentStreams uint32
395 peerMaxHeaderListSize uint64
396 peerMaxHeaderTableSize uint32
397 initialWindowSize uint32
398 initialStreamRecvWindowSize int32
399 readIdleTimeout time.Duration
400 pingTimeout time.Duration
401 extendedConnectAllowed bool
402
403 // rstStreamPingsBlocked works around an unfortunate gRPC behavior.
404 // gRPC strictly limits the number of PING frames that it will receive.
405 // The default is two pings per two hours, but the limit resets every time
406 // the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
407 //
408 // rstStreamPingsBlocked is set after receiving a response to a PING frame
409 // bundled with an RST_STREAM (see pendingResets below), and cleared after
410 // receiving a HEADERS or DATA frame.
411 rstStreamPingsBlocked bool
412
413 // pendingResets is the number of RST_STREAM frames we have sent to the peer,
414 // without confirming that the peer has received them. When we send a RST_STREAM,
415 // we bundle it with a PING frame, unless a PING is already in flight. We count
416 // the reset stream against the connection's concurrency limit until we get
417 // a PING response. This limits the number of requests we'll try to send to a
418 // completely unresponsive connection.
419 pendingResets int
420
421 // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
422 // Write to reqHeaderMu to lock it, read from it to unlock.
423 // Lock reqmu BEFORE mu or wmu.
424 reqHeaderMu chan struct{}
425
426 // wmu is held while writing.
427 // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
428 // Only acquire both at the same time when changing peer settings.
429 wmu sync.Mutex
430 bw *bufio.Writer
431 fr *Framer
432 werr error // first write error that has occurred
433 hbuf bytes.Buffer // HPACK encoder writes into this
434 henc *hpack.Encoder
435}
436
437// clientStream is the state for a single HTTP/2 stream. One of these
438// is created for each Transport.RoundTrip call.
439type clientStream struct {
440 cc *ClientConn
441
442 // Fields of Request that we may access even after the response body is closed.
443 ctx context.Context
444 reqCancel <-chan struct{}
445
446 trace *httptrace.ClientTrace // or nil
447 ID uint32
448 bufPipe pipe // buffered pipe with the flow-controlled response payload
449 requestedGzip bool
450 isHead bool
451
452 abortOnce sync.Once
453 abort chan struct{} // closed to signal stream should end immediately
454 abortErr error // set if abort is closed
455
456 peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
457 donec chan struct{} // closed after the stream is in the closed state
458 on100 chan struct{} // buffered; written to if a 100 is received
459
460 respHeaderRecv chan struct{} // closed when headers are received
461 res *http.Response // set if respHeaderRecv is closed
462
463 flow outflow // guarded by cc.mu
464 inflow inflow // guarded by cc.mu
465 bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
466 readErr error // sticky read error; owned by transportResponseBody.Read
467
468 reqBody io.ReadCloser
469 reqBodyContentLength int64 // -1 means unknown
470 reqBodyClosed chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
471
472 // owned by writeRequest:
473 sentEndStream bool // sent an END_STREAM flag to the peer
474 sentHeaders bool
475
476 // owned by clientConnReadLoop:
477 firstByte bool // got the first response byte
478 pastHeaders bool // got first MetaHeadersFrame (actual headers)
479 pastTrailers bool // got optional second MetaHeadersFrame (trailers)
480 readClosed bool // peer sent an END_STREAM flag
481 readAborted bool // read loop reset the stream
482 totalHeaderSize int64 // total size of 1xx headers seen
483
484 trailer http.Header // accumulated trailers
485 resTrailer *http.Header // client's Response.Trailer
486}
487
488var got1xxFuncForTests func(int, textproto.MIMEHeader) error
489
490// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
491// if any. It returns nil if not set or if the Go version is too old.
492func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
493 if fn := got1xxFuncForTests; fn != nil {
494 return fn
495 }
496 return traceGot1xxResponseFunc(cs.trace)
497}
498
499func (cs *clientStream) abortStream(err error) {
500 cs.cc.mu.Lock()
501 defer cs.cc.mu.Unlock()
502 cs.abortStreamLocked(err)
503}
504
505func (cs *clientStream) abortStreamLocked(err error) {
506 cs.abortOnce.Do(func() {
507 cs.abortErr = err
508 close(cs.abort)
509 })
510 if cs.reqBody != nil {
511 cs.closeReqBodyLocked()
512 }
513 // TODO(dneil): Clean up tests where cs.cc.cond is nil.
514 if cs.cc.cond != nil {
515 // Wake up writeRequestBody if it is waiting on flow control.
516 cs.cc.cond.Broadcast()
517 }
518}
519
520func (cs *clientStream) abortRequestBodyWrite() {
521 cc := cs.cc
522 cc.mu.Lock()
523 defer cc.mu.Unlock()
524 if cs.reqBody != nil && cs.reqBodyClosed == nil {
525 cs.closeReqBodyLocked()
526 cc.cond.Broadcast()
527 }
528}
529
530func (cs *clientStream) closeReqBodyLocked() {
531 if cs.reqBodyClosed != nil {
532 return
533 }
534 cs.reqBodyClosed = make(chan struct{})
535 reqBodyClosed := cs.reqBodyClosed
536 go func() {
537 cs.cc.t.markNewGoroutine()
538 cs.reqBody.Close()
539 close(reqBodyClosed)
540 }()
541}
542
543type stickyErrWriter struct {
544 group synctestGroupInterface
545 conn net.Conn
546 timeout time.Duration
547 err *error
548}
549
550func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
551 if *sew.err != nil {
552 return 0, *sew.err
553 }
554 n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
555 *sew.err = err
556 return n, err
557}
558
559// noCachedConnError is the concrete type of ErrNoCachedConn, which
560// needs to be detected by net/http regardless of whether it's its
561// bundled version (in h2_bundle.go with a rewritten type name) or
562// from a user's x/net/http2. As such, as it has a unique method name
563// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
564// isNoCachedConnError.
565type noCachedConnError struct{}
566
567func (noCachedConnError) IsHTTP2NoCachedConnError() {}
568func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
569
570// isNoCachedConnError reports whether err is of type noCachedConnError
571// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
572// may coexist in the same running program.
573func isNoCachedConnError(err error) bool {
574 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
575 return ok
576}
577
578var ErrNoCachedConn error = noCachedConnError{}
579
580// RoundTripOpt are options for the Transport.RoundTripOpt method.
581type RoundTripOpt struct {
582 // OnlyCachedConn controls whether RoundTripOpt may
583 // create a new TCP connection. If set true and
584 // no cached connection is available, RoundTripOpt
585 // will return ErrNoCachedConn.
586 OnlyCachedConn bool
587
588 allowHTTP bool // allow http:// URLs
589}
590
591func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
592 return t.RoundTripOpt(req, RoundTripOpt{})
593}
594
595// authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
596// and returns a host:port. The port 443 is added if needed.
597func authorityAddr(scheme string, authority string) (addr string) {
598 host, port, err := net.SplitHostPort(authority)
599 if err != nil { // authority didn't have a port
600 host = authority
601 port = ""
602 }
603 if port == "" { // authority's port was empty
604 port = "443"
605 if scheme == "http" {
606 port = "80"
607 }
608 }
609 if a, err := idna.ToASCII(host); err == nil {
610 host = a
611 }
612 // IPv6 address literal, without a port:
613 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
614 return host + ":" + port
615 }
616 return net.JoinHostPort(host, port)
617}
618
619// RoundTripOpt is like RoundTrip, but takes options.
620func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
621 switch req.URL.Scheme {
622 case "https":
623 // Always okay.
624 case "http":
625 if !t.AllowHTTP && !opt.allowHTTP {
626 return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
627 }
628 default:
629 return nil, errors.New("http2: unsupported scheme")
630 }
631
632 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
633 for retry := 0; ; retry++ {
634 cc, err := t.connPool().GetClientConn(req, addr)
635 if err != nil {
636 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
637 return nil, err
638 }
639 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
640 traceGotConn(req, cc, reused)
641 res, err := cc.RoundTrip(req)
642 if err != nil && retry <= 6 {
643 roundTripErr := err
644 if req, err = shouldRetryRequest(req, err); err == nil {
645 // After the first retry, do exponential backoff with 10% jitter.
646 if retry == 0 {
647 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
648 continue
649 }
650 backoff := float64(uint(1) << (uint(retry) - 1))
651 backoff += backoff * (0.1 * mathrand.Float64())
652 d := time.Second * time.Duration(backoff)
653 tm := t.newTimer(d)
654 select {
655 case <-tm.C():
656 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
657 continue
658 case <-req.Context().Done():
659 tm.Stop()
660 err = req.Context().Err()
661 }
662 }
663 }
664 if err == errClientConnNotEstablished {
665 // This ClientConn was created recently,
666 // this is the first request to use it,
667 // and the connection is closed and not usable.
668 //
669 // In this state, cc.idleTimer will remove the conn from the pool
670 // when it fires. Stop the timer and remove it here so future requests
671 // won't try to use this connection.
672 //
673 // If the timer has already fired and we're racing it, the redundant
674 // call to MarkDead is harmless.
675 if cc.idleTimer != nil {
676 cc.idleTimer.Stop()
677 }
678 t.connPool().MarkDead(cc)
679 }
680 if err != nil {
681 t.vlogf("RoundTrip failure: %v", err)
682 return nil, err
683 }
684 return res, nil
685 }
686}
687
688// CloseIdleConnections closes any connections which were previously
689// connected from previous requests but are now sitting idle.
690// It does not interrupt any connections currently in use.
691func (t *Transport) CloseIdleConnections() {
692 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
693 cp.closeIdleConnections()
694 }
695}
696
697var (
698 errClientConnClosed = errors.New("http2: client conn is closed")
699 errClientConnUnusable = errors.New("http2: client conn not usable")
700 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
701 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
702)
703
704// shouldRetryRequest is called by RoundTrip when a request fails to get
705// response headers. It is always called with a non-nil error.
706// It returns either a request to retry (either the same request, or a
707// modified clone), or an error if the request can't be replayed.
708func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
709 if !canRetryError(err) {
710 return nil, err
711 }
712 // If the Body is nil (or http.NoBody), it's safe to reuse
713 // this request and its Body.
714 if req.Body == nil || req.Body == http.NoBody {
715 return req, nil
716 }
717
718 // If the request body can be reset back to its original
719 // state via the optional req.GetBody, do that.
720 if req.GetBody != nil {
721 body, err := req.GetBody()
722 if err != nil {
723 return nil, err
724 }
725 newReq := *req
726 newReq.Body = body
727 return &newReq, nil
728 }
729
730 // The Request.Body can't reset back to the beginning, but we
731 // don't seem to have started to read from it yet, so reuse
732 // the request directly.
733 if err == errClientConnUnusable {
734 return req, nil
735 }
736
737 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
738}
739
740func canRetryError(err error) bool {
741 if err == errClientConnUnusable || err == errClientConnGotGoAway {
742 return true
743 }
744 if se, ok := err.(StreamError); ok {
745 if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
746 // See golang/go#47635, golang/go#42777
747 return true
748 }
749 return se.Code == ErrCodeRefusedStream
750 }
751 return false
752}
753
754func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
755 if t.transportTestHooks != nil {
756 return t.newClientConn(nil, singleUse)
757 }
758 host, _, err := net.SplitHostPort(addr)
759 if err != nil {
760 return nil, err
761 }
762 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
763 if err != nil {
764 return nil, err
765 }
766 return t.newClientConn(tconn, singleUse)
767}
768
769func (t *Transport) newTLSConfig(host string) *tls.Config {
770 cfg := new(tls.Config)
771 if t.TLSClientConfig != nil {
772 *cfg = *t.TLSClientConfig.Clone()
773 }
774 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
775 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
776 }
777 if cfg.ServerName == "" {
778 cfg.ServerName = host
779 }
780 return cfg
781}
782
783func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
784 if t.DialTLSContext != nil {
785 return t.DialTLSContext(ctx, network, addr, tlsCfg)
786 } else if t.DialTLS != nil {
787 return t.DialTLS(network, addr, tlsCfg)
788 }
789
790 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
791 if err != nil {
792 return nil, err
793 }
794 state := tlsCn.ConnectionState()
795 if p := state.NegotiatedProtocol; p != NextProtoTLS {
796 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
797 }
798 if !state.NegotiatedProtocolIsMutual {
799 return nil, errors.New("http2: could not negotiate protocol mutually")
800 }
801 return tlsCn, nil
802}
803
804// disableKeepAlives reports whether connections should be closed as
805// soon as possible after handling the first request.
806func (t *Transport) disableKeepAlives() bool {
807 return t.t1 != nil && t.t1.DisableKeepAlives
808}
809
810func (t *Transport) expectContinueTimeout() time.Duration {
811 if t.t1 == nil {
812 return 0
813 }
814 return t.t1.ExpectContinueTimeout
815}
816
817func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
818 return t.newClientConn(c, t.disableKeepAlives())
819}
820
821func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
822 conf := configFromTransport(t)
823 cc := &ClientConn{
824 t: t,
825 tconn: c,
826 readerDone: make(chan struct{}),
827 nextStreamID: 1,
828 maxFrameSize: 16 << 10, // spec default
829 initialWindowSize: 65535, // spec default
830 initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
831 maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
832 peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
833 streams: make(map[uint32]*clientStream),
834 singleUse: singleUse,
835 seenSettingsChan: make(chan struct{}),
836 wantSettingsAck: true,
837 readIdleTimeout: conf.SendPingTimeout,
838 pingTimeout: conf.PingTimeout,
839 pings: make(map[[8]byte]chan struct{}),
840 reqHeaderMu: make(chan struct{}, 1),
841 lastActive: t.now(),
842 }
843 var group synctestGroupInterface
844 if t.transportTestHooks != nil {
845 t.markNewGoroutine()
846 t.transportTestHooks.newclientconn(cc)
847 c = cc.tconn
848 group = t.group
849 }
850 if VerboseLogs {
851 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
852 }
853
854 cc.cond = sync.NewCond(&cc.mu)
855 cc.flow.add(int32(initialWindowSize))
856
857 // TODO: adjust this writer size to account for frame size +
858 // MTU + crypto/tls record padding.
859 cc.bw = bufio.NewWriter(stickyErrWriter{
860 group: group,
861 conn: c,
862 timeout: conf.WriteByteTimeout,
863 err: &cc.werr,
864 })
865 cc.br = bufio.NewReader(c)
866 cc.fr = NewFramer(cc.bw, cc.br)
867 cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
868 if t.CountError != nil {
869 cc.fr.countError = t.CountError
870 }
871 maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
872 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
873 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
874
875 cc.henc = hpack.NewEncoder(&cc.hbuf)
876 cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
877 cc.peerMaxHeaderTableSize = initialHeaderTableSize
878
879 if cs, ok := c.(connectionStater); ok {
880 state := cs.ConnectionState()
881 cc.tlsState = &state
882 }
883
884 initialSettings := []Setting{
885 {ID: SettingEnablePush, Val: 0},
886 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
887 }
888 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
889 if max := t.maxHeaderListSize(); max != 0 {
890 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
891 }
892 if maxHeaderTableSize != initialHeaderTableSize {
893 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
894 }
895
896 cc.bw.Write(clientPreface)
897 cc.fr.WriteSettings(initialSettings...)
898 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
899 cc.inflow.init(conf.MaxUploadBufferPerConnection + initialWindowSize)
900 cc.bw.Flush()
901 if cc.werr != nil {
902 cc.Close()
903 return nil, cc.werr
904 }
905
906 // Start the idle timer after the connection is fully initialized.
907 if d := t.idleConnTimeout(); d != 0 {
908 cc.idleTimeout = d
909 cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
910 }
911
912 go cc.readLoop()
913 return cc, nil
914}
915
916func (cc *ClientConn) healthCheck() {
917 pingTimeout := cc.pingTimeout
918 // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
919 // trigger the healthCheck again if there is no frame received.
920 ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
921 defer cancel()
922 cc.vlogf("http2: Transport sending health check")
923 err := cc.Ping(ctx)
924 if err != nil {
925 cc.vlogf("http2: Transport health check failure: %v", err)
926 cc.closeForLostPing()
927 } else {
928 cc.vlogf("http2: Transport health check success")
929 }
930}
931
932// SetDoNotReuse marks cc as not reusable for future HTTP requests.
933func (cc *ClientConn) SetDoNotReuse() {
934 cc.mu.Lock()
935 defer cc.mu.Unlock()
936 cc.doNotReuse = true
937}
938
939func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
940 cc.mu.Lock()
941 defer cc.mu.Unlock()
942
943 old := cc.goAway
944 cc.goAway = f
945
946 // Merge the previous and current GoAway error frames.
947 if cc.goAwayDebug == "" {
948 cc.goAwayDebug = string(f.DebugData())
949 }
950 if old != nil && old.ErrCode != ErrCodeNo {
951 cc.goAway.ErrCode = old.ErrCode
952 }
953 last := f.LastStreamID
954 for streamID, cs := range cc.streams {
955 if streamID <= last {
956 // The server's GOAWAY indicates that it received this stream.
957 // It will either finish processing it, or close the connection
958 // without doing so. Either way, leave the stream alone for now.
959 continue
960 }
961 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
962 // Don't retry the first stream on a connection if we get a non-NO error.
963 // If the server is sending an error on a new connection,
964 // retrying the request on a new one probably isn't going to work.
965 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
966 } else {
967 // Aborting the stream with errClentConnGotGoAway indicates that
968 // the request should be retried on a new connection.
969 cs.abortStreamLocked(errClientConnGotGoAway)
970 }
971 }
972}
973
974// CanTakeNewRequest reports whether the connection can take a new request,
975// meaning it has not been closed or received or sent a GOAWAY.
976//
977// If the caller is going to immediately make a new request on this
978// connection, use ReserveNewRequest instead.
979func (cc *ClientConn) CanTakeNewRequest() bool {
980 cc.mu.Lock()
981 defer cc.mu.Unlock()
982 return cc.canTakeNewRequestLocked()
983}
984
985// ReserveNewRequest is like CanTakeNewRequest but also reserves a
986// concurrent stream in cc. The reservation is decremented on the
987// next call to RoundTrip.
988func (cc *ClientConn) ReserveNewRequest() bool {
989 cc.mu.Lock()
990 defer cc.mu.Unlock()
991 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
992 return false
993 }
994 cc.streamsReserved++
995 return true
996}
997
998// ClientConnState describes the state of a ClientConn.
999type ClientConnState struct {
1000 // Closed is whether the connection is closed.
1001 Closed bool
1002
1003 // Closing is whether the connection is in the process of
1004 // closing. It may be closing due to shutdown, being a
1005 // single-use connection, being marked as DoNotReuse, or
1006 // having received a GOAWAY frame.
1007 Closing bool
1008
1009 // StreamsActive is how many streams are active.
1010 StreamsActive int
1011
1012 // StreamsReserved is how many streams have been reserved via
1013 // ClientConn.ReserveNewRequest.
1014 StreamsReserved int
1015
1016 // StreamsPending is how many requests have been sent in excess
1017 // of the peer's advertised MaxConcurrentStreams setting and
1018 // are waiting for other streams to complete.
1019 StreamsPending int
1020
1021 // MaxConcurrentStreams is how many concurrent streams the
1022 // peer advertised as acceptable. Zero means no SETTINGS
1023 // frame has been received yet.
1024 MaxConcurrentStreams uint32
1025
1026 // LastIdle, if non-zero, is when the connection last
1027 // transitioned to idle state.
1028 LastIdle time.Time
1029}
1030
1031// State returns a snapshot of cc's state.
1032func (cc *ClientConn) State() ClientConnState {
1033 cc.wmu.Lock()
1034 maxConcurrent := cc.maxConcurrentStreams
1035 if !cc.seenSettings {
1036 maxConcurrent = 0
1037 }
1038 cc.wmu.Unlock()
1039
1040 cc.mu.Lock()
1041 defer cc.mu.Unlock()
1042 return ClientConnState{
1043 Closed: cc.closed,
1044 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
1045 StreamsActive: len(cc.streams) + cc.pendingResets,
1046 StreamsReserved: cc.streamsReserved,
1047 StreamsPending: cc.pendingRequests,
1048 LastIdle: cc.lastIdle,
1049 MaxConcurrentStreams: maxConcurrent,
1050 }
1051}
1052
1053// clientConnIdleState describes the suitability of a client
1054// connection to initiate a new RoundTrip request.
1055type clientConnIdleState struct {
1056 canTakeNewRequest bool
1057}
1058
1059func (cc *ClientConn) idleState() clientConnIdleState {
1060 cc.mu.Lock()
1061 defer cc.mu.Unlock()
1062 return cc.idleStateLocked()
1063}
1064
1065func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
1066 if cc.singleUse && cc.nextStreamID > 1 {
1067 return
1068 }
1069 var maxConcurrentOkay bool
1070 if cc.t.StrictMaxConcurrentStreams {
1071 // We'll tell the caller we can take a new request to
1072 // prevent the caller from dialing a new TCP
1073 // connection, but then we'll block later before
1074 // writing it.
1075 maxConcurrentOkay = true
1076 } else {
1077 // We can take a new request if the total of
1078 // - active streams;
1079 // - reservation slots for new streams; and
1080 // - streams for which we have sent a RST_STREAM and a PING,
1081 // but received no subsequent frame
1082 // is less than the concurrency limit.
1083 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
1084 }
1085
1086 st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
1087 !cc.doNotReuse &&
1088 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
1089 !cc.tooIdleLocked()
1090
1091 // If this connection has never been used for a request and is closed,
1092 // then let it take a request (which will fail).
1093 // If the conn was closed for idleness, we're racing the idle timer;
1094 // don't try to use the conn. (Issue #70515.)
1095 //
1096 // This avoids a situation where an error early in a connection's lifetime
1097 // goes unreported.
1098 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
1099 st.canTakeNewRequest = true
1100 }
1101
1102 return
1103}
1104
1105// currentRequestCountLocked reports the number of concurrency slots currently in use,
1106// including active streams, reserved slots, and reset streams waiting for acknowledgement.
1107func (cc *ClientConn) currentRequestCountLocked() int {
1108 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1109}
1110
1111func (cc *ClientConn) canTakeNewRequestLocked() bool {
1112 st := cc.idleStateLocked()
1113 return st.canTakeNewRequest
1114}
1115
1116// tooIdleLocked reports whether this connection has been been sitting idle
1117// for too much wall time.
1118func (cc *ClientConn) tooIdleLocked() bool {
1119 // The Round(0) strips the monontonic clock reading so the
1120 // times are compared based on their wall time. We don't want
1121 // to reuse a connection that's been sitting idle during
1122 // VM/laptop suspend if monotonic time was also frozen.
1123 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
1124}
1125
1126// onIdleTimeout is called from a time.AfterFunc goroutine. It will
1127// only be called when we're idle, but because we're coming from a new
1128// goroutine, there could be a new request coming in at the same time,
1129// so this simply calls the synchronized closeIfIdle to shut down this
1130// connection. The timer could just call closeIfIdle, but this is more
1131// clear.
1132func (cc *ClientConn) onIdleTimeout() {
1133 cc.closeIfIdle()
1134}
1135
1136func (cc *ClientConn) closeConn() {
1137 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1138 defer t.Stop()
1139 cc.tconn.Close()
1140}
1141
1142// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
1143// Try to shut it down more aggressively.
1144func (cc *ClientConn) forceCloseConn() {
1145 tc, ok := cc.tconn.(*tls.Conn)
1146 if !ok {
1147 return
1148 }
1149 if nc := tc.NetConn(); nc != nil {
1150 nc.Close()
1151 }
1152}
1153
1154func (cc *ClientConn) closeIfIdle() {
1155 cc.mu.Lock()
1156 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1157 cc.mu.Unlock()
1158 return
1159 }
1160 cc.closed = true
1161 cc.closedOnIdle = true
1162 nextID := cc.nextStreamID
1163 // TODO: do clients send GOAWAY too? maybe? Just Close:
1164 cc.mu.Unlock()
1165
1166 if VerboseLogs {
1167 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1168 }
1169 cc.closeConn()
1170}
1171
1172func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1173 cc.mu.Lock()
1174 defer cc.mu.Unlock()
1175 return cc.doNotReuse && len(cc.streams) == 0
1176}
1177
1178var shutdownEnterWaitStateHook = func() {}
1179
1180// Shutdown gracefully closes the client connection, waiting for running streams to complete.
1181func (cc *ClientConn) Shutdown(ctx context.Context) error {
1182 if err := cc.sendGoAway(); err != nil {
1183 return err
1184 }
1185 // Wait for all in-flight streams to complete or connection to close
1186 done := make(chan struct{})
1187 cancelled := false // guarded by cc.mu
1188 go func() {
1189 cc.t.markNewGoroutine()
1190 cc.mu.Lock()
1191 defer cc.mu.Unlock()
1192 for {
1193 if len(cc.streams) == 0 || cc.closed {
1194 cc.closed = true
1195 close(done)
1196 break
1197 }
1198 if cancelled {
1199 break
1200 }
1201 cc.cond.Wait()
1202 }
1203 }()
1204 shutdownEnterWaitStateHook()
1205 select {
1206 case <-done:
1207 cc.closeConn()
1208 return nil
1209 case <-ctx.Done():
1210 cc.mu.Lock()
1211 // Free the goroutine above
1212 cancelled = true
1213 cc.cond.Broadcast()
1214 cc.mu.Unlock()
1215 return ctx.Err()
1216 }
1217}
1218
1219func (cc *ClientConn) sendGoAway() error {
1220 cc.mu.Lock()
1221 closing := cc.closing
1222 cc.closing = true
1223 maxStreamID := cc.nextStreamID
1224 cc.mu.Unlock()
1225 if closing {
1226 // GOAWAY sent already
1227 return nil
1228 }
1229
1230 cc.wmu.Lock()
1231 defer cc.wmu.Unlock()
1232 // Send a graceful shutdown frame to server
1233 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1234 return err
1235 }
1236 if err := cc.bw.Flush(); err != nil {
1237 return err
1238 }
1239 // Prevent new requests
1240 return nil
1241}
1242
1243// closes the client connection immediately. In-flight requests are interrupted.
1244// err is sent to streams.
1245func (cc *ClientConn) closeForError(err error) {
1246 cc.mu.Lock()
1247 cc.closed = true
1248 for _, cs := range cc.streams {
1249 cs.abortStreamLocked(err)
1250 }
1251 cc.cond.Broadcast()
1252 cc.mu.Unlock()
1253 cc.closeConn()
1254}
1255
1256// Close closes the client connection immediately.
1257//
1258// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
1259func (cc *ClientConn) Close() error {
1260 err := errors.New("http2: client connection force closed via ClientConn.Close")
1261 cc.closeForError(err)
1262 return nil
1263}
1264
1265// closes the client connection immediately. In-flight requests are interrupted.
1266func (cc *ClientConn) closeForLostPing() {
1267 err := errors.New("http2: client connection lost")
1268 if f := cc.t.CountError; f != nil {
1269 f("conn_close_lost_ping")
1270 }
1271 cc.closeForError(err)
1272}
1273
1274// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
1275// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
1276var errRequestCanceled = errors.New("net/http: request canceled")
1277
1278func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1279 if cc.t.t1 != nil {
1280 return cc.t.t1.ResponseHeaderTimeout
1281 }
1282 // No way to do this (yet?) with just an http2.Transport. Probably
1283 // no need. Request.Cancel this is the new way. We only need to support
1284 // this for compatibility with the old http.Transport fields when
1285 // we're doing transparent http2.
1286 return 0
1287}
1288
1289// actualContentLength returns a sanitized version of
1290// req.ContentLength, where 0 actually means zero (not unknown) and -1
1291// means unknown.
1292func actualContentLength(req *http.Request) int64 {
1293 if req.Body == nil || req.Body == http.NoBody {
1294 return 0
1295 }
1296 if req.ContentLength != 0 {
1297 return req.ContentLength
1298 }
1299 return -1
1300}
1301
1302func (cc *ClientConn) decrStreamReservations() {
1303 cc.mu.Lock()
1304 defer cc.mu.Unlock()
1305 cc.decrStreamReservationsLocked()
1306}
1307
1308func (cc *ClientConn) decrStreamReservationsLocked() {
1309 if cc.streamsReserved > 0 {
1310 cc.streamsReserved--
1311 }
1312}
1313
1314func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1315 return cc.roundTrip(req, nil)
1316}
1317
1318func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
1319 ctx := req.Context()
1320 cs := &clientStream{
1321 cc: cc,
1322 ctx: ctx,
1323 reqCancel: req.Cancel,
1324 isHead: req.Method == "HEAD",
1325 reqBody: req.Body,
1326 reqBodyContentLength: actualContentLength(req),
1327 trace: httptrace.ContextClientTrace(ctx),
1328 peerClosed: make(chan struct{}),
1329 abort: make(chan struct{}),
1330 respHeaderRecv: make(chan struct{}),
1331 donec: make(chan struct{}),
1332 }
1333
1334 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1335
1336 go cs.doRequest(req, streamf)
1337
1338 waitDone := func() error {
1339 select {
1340 case <-cs.donec:
1341 return nil
1342 case <-ctx.Done():
1343 return ctx.Err()
1344 case <-cs.reqCancel:
1345 return errRequestCanceled
1346 }
1347 }
1348
1349 handleResponseHeaders := func() (*http.Response, error) {
1350 res := cs.res
1351 if res.StatusCode > 299 {
1352 // On error or status code 3xx, 4xx, 5xx, etc abort any
1353 // ongoing write, assuming that the server doesn't care
1354 // about our request body. If the server replied with 1xx or
1355 // 2xx, however, then assume the server DOES potentially
1356 // want our body (e.g. full-duplex streaming:
1357 // golang.org/issue/13444). If it turns out the server
1358 // doesn't, they'll RST_STREAM us soon enough. This is a
1359 // heuristic to avoid adding knobs to Transport. Hopefully
1360 // we can keep it.
1361 cs.abortRequestBodyWrite()
1362 }
1363 res.Request = req
1364 res.TLS = cc.tlsState
1365 if res.Body == noBody && actualContentLength(req) == 0 {
1366 // If there isn't a request or response body still being
1367 // written, then wait for the stream to be closed before
1368 // RoundTrip returns.
1369 if err := waitDone(); err != nil {
1370 return nil, err
1371 }
1372 }
1373 return res, nil
1374 }
1375
1376 cancelRequest := func(cs *clientStream, err error) error {
1377 cs.cc.mu.Lock()
1378 bodyClosed := cs.reqBodyClosed
1379 cs.cc.mu.Unlock()
1380 // Wait for the request body to be closed.
1381 //
1382 // If nothing closed the body before now, abortStreamLocked
1383 // will have started a goroutine to close it.
1384 //
1385 // Closing the body before returning avoids a race condition
1386 // with net/http checking its readTrackingBody to see if the
1387 // body was read from or closed. See golang/go#60041.
1388 //
1389 // The body is closed in a separate goroutine without the
1390 // connection mutex held, but dropping the mutex before waiting
1391 // will keep us from holding it indefinitely if the body
1392 // close is slow for some reason.
1393 if bodyClosed != nil {
1394 <-bodyClosed
1395 }
1396 return err
1397 }
1398
1399 for {
1400 select {
1401 case <-cs.respHeaderRecv:
1402 return handleResponseHeaders()
1403 case <-cs.abort:
1404 select {
1405 case <-cs.respHeaderRecv:
1406 // If both cs.respHeaderRecv and cs.abort are signaling,
1407 // pick respHeaderRecv. The server probably wrote the
1408 // response and immediately reset the stream.
1409 // golang.org/issue/49645
1410 return handleResponseHeaders()
1411 default:
1412 waitDone()
1413 return nil, cs.abortErr
1414 }
1415 case <-ctx.Done():
1416 err := ctx.Err()
1417 cs.abortStream(err)
1418 return nil, cancelRequest(cs, err)
1419 case <-cs.reqCancel:
1420 cs.abortStream(errRequestCanceled)
1421 return nil, cancelRequest(cs, errRequestCanceled)
1422 }
1423 }
1424}
1425
1426// doRequest runs for the duration of the request lifetime.
1427//
1428// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
1429func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
1430 cs.cc.t.markNewGoroutine()
1431 err := cs.writeRequest(req, streamf)
1432 cs.cleanupWriteRequest(err)
1433}
1434
1435var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1436
1437// writeRequest sends a request.
1438//
1439// It returns nil after the request is written, the response read,
1440// and the request stream is half-closed by the peer.
1441//
1442// It returns non-nil if the request ends otherwise.
1443// If the returned error is StreamError, the error Code may be used in resetting the stream.
1444func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
1445 cc := cs.cc
1446 ctx := cs.ctx
1447
1448 // wait for setting frames to be received, a server can change this value later,
1449 // but we just wait for the first settings frame
1450 var isExtendedConnect bool
1451 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1452 isExtendedConnect = true
1453 }
1454
1455 // Acquire the new-request lock by writing to reqHeaderMu.
1456 // This lock guards the critical section covering allocating a new stream ID
1457 // (requires mu) and creating the stream (requires wmu).
1458 if cc.reqHeaderMu == nil {
1459 panic("RoundTrip on uninitialized ClientConn") // for tests
1460 }
1461 if isExtendedConnect {
1462 select {
1463 case <-cs.reqCancel:
1464 return errRequestCanceled
1465 case <-ctx.Done():
1466 return ctx.Err()
1467 case <-cc.seenSettingsChan:
1468 if !cc.extendedConnectAllowed {
1469 return errExtendedConnectNotSupported
1470 }
1471 }
1472 }
1473 select {
1474 case cc.reqHeaderMu <- struct{}{}:
1475 case <-cs.reqCancel:
1476 return errRequestCanceled
1477 case <-ctx.Done():
1478 return ctx.Err()
1479 }
1480
1481 cc.mu.Lock()
1482 if cc.idleTimer != nil {
1483 cc.idleTimer.Stop()
1484 }
1485 cc.decrStreamReservationsLocked()
1486 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1487 cc.mu.Unlock()
1488 <-cc.reqHeaderMu
1489 return err
1490 }
1491 cc.addStreamLocked(cs) // assigns stream ID
1492 if isConnectionCloseRequest(req) {
1493 cc.doNotReuse = true
1494 }
1495 cc.mu.Unlock()
1496
1497 if streamf != nil {
1498 streamf(cs)
1499 }
1500
1501 continueTimeout := cc.t.expectContinueTimeout()
1502 if continueTimeout != 0 {
1503 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1504 continueTimeout = 0
1505 } else {
1506 cs.on100 = make(chan struct{}, 1)
1507 }
1508 }
1509
1510 // Past this point (where we send request headers), it is possible for
1511 // RoundTrip to return successfully. Since the RoundTrip contract permits
1512 // the caller to "mutate or reuse" the Request after closing the Response's Body,
1513 // we must take care when referencing the Request from here on.
1514 err = cs.encodeAndWriteHeaders(req)
1515 <-cc.reqHeaderMu
1516 if err != nil {
1517 return err
1518 }
1519
1520 hasBody := cs.reqBodyContentLength != 0
1521 if !hasBody {
1522 cs.sentEndStream = true
1523 } else {
1524 if continueTimeout != 0 {
1525 traceWait100Continue(cs.trace)
1526 timer := time.NewTimer(continueTimeout)
1527 select {
1528 case <-timer.C:
1529 err = nil
1530 case <-cs.on100:
1531 err = nil
1532 case <-cs.abort:
1533 err = cs.abortErr
1534 case <-ctx.Done():
1535 err = ctx.Err()
1536 case <-cs.reqCancel:
1537 err = errRequestCanceled
1538 }
1539 timer.Stop()
1540 if err != nil {
1541 traceWroteRequest(cs.trace, err)
1542 return err
1543 }
1544 }
1545
1546 if err = cs.writeRequestBody(req); err != nil {
1547 if err != errStopReqBodyWrite {
1548 traceWroteRequest(cs.trace, err)
1549 return err
1550 }
1551 } else {
1552 cs.sentEndStream = true
1553 }
1554 }
1555
1556 traceWroteRequest(cs.trace, err)
1557
1558 var respHeaderTimer <-chan time.Time
1559 var respHeaderRecv chan struct{}
1560 if d := cc.responseHeaderTimeout(); d != 0 {
1561 timer := cc.t.newTimer(d)
1562 defer timer.Stop()
1563 respHeaderTimer = timer.C()
1564 respHeaderRecv = cs.respHeaderRecv
1565 }
1566 // Wait until the peer half-closes its end of the stream,
1567 // or until the request is aborted (via context, error, or otherwise),
1568 // whichever comes first.
1569 for {
1570 select {
1571 case <-cs.peerClosed:
1572 return nil
1573 case <-respHeaderTimer:
1574 return errTimeout
1575 case <-respHeaderRecv:
1576 respHeaderRecv = nil
1577 respHeaderTimer = nil // keep waiting for END_STREAM
1578 case <-cs.abort:
1579 return cs.abortErr
1580 case <-ctx.Done():
1581 return ctx.Err()
1582 case <-cs.reqCancel:
1583 return errRequestCanceled
1584 }
1585 }
1586}
1587
1588func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
1589 cc := cs.cc
1590 ctx := cs.ctx
1591
1592 cc.wmu.Lock()
1593 defer cc.wmu.Unlock()
1594
1595 // If the request was canceled while waiting for cc.mu, just quit.
1596 select {
1597 case <-cs.abort:
1598 return cs.abortErr
1599 case <-ctx.Done():
1600 return ctx.Err()
1601 case <-cs.reqCancel:
1602 return errRequestCanceled
1603 default:
1604 }
1605
1606 // Encode headers.
1607 //
1608 // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
1609 // sent by writeRequestBody below, along with any Trailers,
1610 // again in form HEADERS{1}, CONTINUATION{0,})
1611 cc.hbuf.Reset()
1612 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1613 cc.writeHeader(name, value)
1614 })
1615 if err != nil {
1616 return fmt.Errorf("http2: %w", err)
1617 }
1618 hdrs := cc.hbuf.Bytes()
1619
1620 // Write the request.
1621 endStream := !res.HasBody && !res.HasTrailers
1622 cs.sentHeaders = true
1623 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1624 traceWroteHeaders(cs.trace)
1625 return err
1626}
1627
1628func encodeRequestHeaders(req *http.Request, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1629 return httpcommon.EncodeHeaders(req.Context(), httpcommon.EncodeHeadersParam{
1630 Request: httpcommon.Request{
1631 Header: req.Header,
1632 Trailer: req.Trailer,
1633 URL: req.URL,
1634 Host: req.Host,
1635 Method: req.Method,
1636 ActualContentLength: actualContentLength(req),
1637 },
1638 AddGzipHeader: addGzipHeader,
1639 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1640 DefaultUserAgent: defaultUserAgent,
1641 }, headerf)
1642}
1643
1644// cleanupWriteRequest performs post-request tasks.
1645//
1646// If err (the result of writeRequest) is non-nil and the stream is not closed,
1647// cleanupWriteRequest will send a reset to the peer.
1648func (cs *clientStream) cleanupWriteRequest(err error) {
1649 cc := cs.cc
1650
1651 if cs.ID == 0 {
1652 // We were canceled before creating the stream, so return our reservation.
1653 cc.decrStreamReservations()
1654 }
1655
1656 // TODO: write h12Compare test showing whether
1657 // Request.Body is closed by the Transport,
1658 // and in multiple cases: server replies <=299 and >299
1659 // while still writing request body
1660 cc.mu.Lock()
1661 mustCloseBody := false
1662 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1663 mustCloseBody = true
1664 cs.reqBodyClosed = make(chan struct{})
1665 }
1666 bodyClosed := cs.reqBodyClosed
1667 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1668 cc.mu.Unlock()
1669 if mustCloseBody {
1670 cs.reqBody.Close()
1671 close(bodyClosed)
1672 }
1673 if bodyClosed != nil {
1674 <-bodyClosed
1675 }
1676
1677 if err != nil && cs.sentEndStream {
1678 // If the connection is closed immediately after the response is read,
1679 // we may be aborted before finishing up here. If the stream was closed
1680 // cleanly on both sides, there is no error.
1681 select {
1682 case <-cs.peerClosed:
1683 err = nil
1684 default:
1685 }
1686 }
1687 if err != nil {
1688 cs.abortStream(err) // possibly redundant, but harmless
1689 if cs.sentHeaders {
1690 if se, ok := err.(StreamError); ok {
1691 if se.Cause != errFromPeer {
1692 cc.writeStreamReset(cs.ID, se.Code, false, err)
1693 }
1694 } else {
1695 // We're cancelling an in-flight request.
1696 //
1697 // This could be due to the server becoming unresponsive.
1698 // To avoid sending too many requests on a dead connection,
1699 // we let the request continue to consume a concurrency slot
1700 // until we can confirm the server is still responding.
1701 // We do this by sending a PING frame along with the RST_STREAM
1702 // (unless a ping is already in flight).
1703 //
1704 // For simplicity, we don't bother tracking the PING payload:
1705 // We reset cc.pendingResets any time we receive a PING ACK.
1706 //
1707 // We skip this if the conn is going to be closed on idle,
1708 // because it's short lived and will probably be closed before
1709 // we get the ping response.
1710 ping := false
1711 if !closeOnIdle {
1712 cc.mu.Lock()
1713 // rstStreamPingsBlocked works around a gRPC behavior:
1714 // see comment on the field for details.
1715 if !cc.rstStreamPingsBlocked {
1716 if cc.pendingResets == 0 {
1717 ping = true
1718 }
1719 cc.pendingResets++
1720 }
1721 cc.mu.Unlock()
1722 }
1723 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1724 }
1725 }
1726 cs.bufPipe.CloseWithError(err) // no-op if already closed
1727 } else {
1728 if cs.sentHeaders && !cs.sentEndStream {
1729 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1730 }
1731 cs.bufPipe.CloseWithError(errRequestCanceled)
1732 }
1733 if cs.ID != 0 {
1734 cc.forgetStreamID(cs.ID)
1735 }
1736
1737 cc.wmu.Lock()
1738 werr := cc.werr
1739 cc.wmu.Unlock()
1740 if werr != nil {
1741 cc.Close()
1742 }
1743
1744 close(cs.donec)
1745}
1746
1747// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
1748// Must hold cc.mu.
1749func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1750 for {
1751 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1752 // This is the very first request sent to this connection.
1753 // Return a fatal error which aborts the retry loop.
1754 return errClientConnNotEstablished
1755 }
1756 cc.lastActive = cc.t.now()
1757 if cc.closed || !cc.canTakeNewRequestLocked() {
1758 return errClientConnUnusable
1759 }
1760 cc.lastIdle = time.Time{}
1761 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1762 return nil
1763 }
1764 cc.pendingRequests++
1765 cc.cond.Wait()
1766 cc.pendingRequests--
1767 select {
1768 case <-cs.abort:
1769 return cs.abortErr
1770 default:
1771 }
1772 }
1773}
1774
1775// requires cc.wmu be held
1776func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1777 first := true // first frame written (HEADERS is first, then CONTINUATION)
1778 for len(hdrs) > 0 && cc.werr == nil {
1779 chunk := hdrs
1780 if len(chunk) > maxFrameSize {
1781 chunk = chunk[:maxFrameSize]
1782 }
1783 hdrs = hdrs[len(chunk):]
1784 endHeaders := len(hdrs) == 0
1785 if first {
1786 cc.fr.WriteHeaders(HeadersFrameParam{
1787 StreamID: streamID,
1788 BlockFragment: chunk,
1789 EndStream: endStream,
1790 EndHeaders: endHeaders,
1791 })
1792 first = false
1793 } else {
1794 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1795 }
1796 }
1797 cc.bw.Flush()
1798 return cc.werr
1799}
1800
1801// internal error values; they don't escape to callers
1802var (
1803 // abort request body write; don't send cancel
1804 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1805
1806 // abort request body write, but send stream reset of cancel.
1807 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1808
1809 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1810)
1811
1812// frameScratchBufferLen returns the length of a buffer to use for
1813// outgoing request bodies to read/write to/from.
1814//
1815// It returns max(1, min(peer's advertised max frame size,
1816// Request.ContentLength+1, 512KB)).
1817func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1818 const max = 512 << 10
1819 n := int64(maxFrameSize)
1820 if n > max {
1821 n = max
1822 }
1823 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1824 // Add an extra byte past the declared content-length to
1825 // give the caller's Request.Body io.Reader a chance to
1826 // give us more bytes than they declared, so we can catch it
1827 // early.
1828 n = cl + 1
1829 }
1830 if n < 1 {
1831 return 1
1832 }
1833 return int(n) // doesn't truncate; max is 512K
1834}
1835
1836// Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
1837// streaming requests using small frame sizes occupy large buffers initially allocated for prior
1838// requests needing big buffers. The size ranges are as follows:
1839// {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
1840// {256 KB, 512 KB], {512 KB, infinity}
1841// In practice, the maximum scratch buffer size should not exceed 512 KB due to
1842// frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
1843// It exists mainly as a safety measure, for potential future increases in max buffer size.
1844var bufPools [7]sync.Pool // of *[]byte
1845func bufPoolIndex(size int) int {
1846 if size <= 16384 {
1847 return 0
1848 }
1849 size -= 1
1850 bits := bits.Len(uint(size))
1851 index := bits - 14
1852 if index >= len(bufPools) {
1853 return len(bufPools) - 1
1854 }
1855 return index
1856}
1857
1858func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
1859 cc := cs.cc
1860 body := cs.reqBody
1861 sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
1862
1863 hasTrailers := req.Trailer != nil
1864 remainLen := cs.reqBodyContentLength
1865 hasContentLen := remainLen != -1
1866
1867 cc.mu.Lock()
1868 maxFrameSize := int(cc.maxFrameSize)
1869 cc.mu.Unlock()
1870
1871 // Scratch buffer for reading into & writing from.
1872 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1873 var buf []byte
1874 index := bufPoolIndex(scratchLen)
1875 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1876 defer bufPools[index].Put(bp)
1877 buf = *bp
1878 } else {
1879 buf = make([]byte, scratchLen)
1880 defer bufPools[index].Put(&buf)
1881 }
1882
1883 var sawEOF bool
1884 for !sawEOF {
1885 n, err := body.Read(buf)
1886 if hasContentLen {
1887 remainLen -= int64(n)
1888 if remainLen == 0 && err == nil {
1889 // The request body's Content-Length was predeclared and
1890 // we just finished reading it all, but the underlying io.Reader
1891 // returned the final chunk with a nil error (which is one of
1892 // the two valid things a Reader can do at EOF). Because we'd prefer
1893 // to send the END_STREAM bit early, double-check that we're actually
1894 // at EOF. Subsequent reads should return (0, EOF) at this point.
1895 // If either value is different, we return an error in one of two ways below.
1896 var scratch [1]byte
1897 var n1 int
1898 n1, err = body.Read(scratch[:])
1899 remainLen -= int64(n1)
1900 }
1901 if remainLen < 0 {
1902 err = errReqBodyTooLong
1903 return err
1904 }
1905 }
1906 if err != nil {
1907 cc.mu.Lock()
1908 bodyClosed := cs.reqBodyClosed != nil
1909 cc.mu.Unlock()
1910 switch {
1911 case bodyClosed:
1912 return errStopReqBodyWrite
1913 case err == io.EOF:
1914 sawEOF = true
1915 err = nil
1916 default:
1917 return err
1918 }
1919 }
1920
1921 remain := buf[:n]
1922 for len(remain) > 0 && err == nil {
1923 var allowed int32
1924 allowed, err = cs.awaitFlowControl(len(remain))
1925 if err != nil {
1926 return err
1927 }
1928 cc.wmu.Lock()
1929 data := remain[:allowed]
1930 remain = remain[allowed:]
1931 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1932 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1933 if err == nil {
1934 // TODO(bradfitz): this flush is for latency, not bandwidth.
1935 // Most requests won't need this. Make this opt-in or
1936 // opt-out? Use some heuristic on the body type? Nagel-like
1937 // timers? Based on 'n'? Only last chunk of this for loop,
1938 // unless flow control tokens are low? For now, always.
1939 // If we change this, see comment below.
1940 err = cc.bw.Flush()
1941 }
1942 cc.wmu.Unlock()
1943 }
1944 if err != nil {
1945 return err
1946 }
1947 }
1948
1949 if sentEnd {
1950 // Already sent END_STREAM (which implies we have no
1951 // trailers) and flushed, because currently all
1952 // WriteData frames above get a flush. So we're done.
1953 return nil
1954 }
1955
1956 // Since the RoundTrip contract permits the caller to "mutate or reuse"
1957 // a request after the Response's Body is closed, verify that this hasn't
1958 // happened before accessing the trailers.
1959 cc.mu.Lock()
1960 trailer := req.Trailer
1961 err = cs.abortErr
1962 cc.mu.Unlock()
1963 if err != nil {
1964 return err
1965 }
1966
1967 cc.wmu.Lock()
1968 defer cc.wmu.Unlock()
1969 var trls []byte
1970 if len(trailer) > 0 {
1971 trls, err = cc.encodeTrailers(trailer)
1972 if err != nil {
1973 return err
1974 }
1975 }
1976
1977 // Two ways to send END_STREAM: either with trailers, or
1978 // with an empty DATA frame.
1979 if len(trls) > 0 {
1980 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1981 } else {
1982 err = cc.fr.WriteData(cs.ID, true, nil)
1983 }
1984 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1985 err = ferr
1986 }
1987 return err
1988}
1989
1990// awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
1991// control tokens from the server.
1992// It returns either the non-zero number of tokens taken or an error
1993// if the stream is dead.
1994func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1995 cc := cs.cc
1996 ctx := cs.ctx
1997 cc.mu.Lock()
1998 defer cc.mu.Unlock()
1999 for {
2000 if cc.closed {
2001 return 0, errClientConnClosed
2002 }
2003 if cs.reqBodyClosed != nil {
2004 return 0, errStopReqBodyWrite
2005 }
2006 select {
2007 case <-cs.abort:
2008 return 0, cs.abortErr
2009 case <-ctx.Done():
2010 return 0, ctx.Err()
2011 case <-cs.reqCancel:
2012 return 0, errRequestCanceled
2013 default:
2014 }
2015 if a := cs.flow.available(); a > 0 {
2016 take := a
2017 if int(take) > maxBytes {
2018
2019 take = int32(maxBytes) // can't truncate int; take is int32
2020 }
2021 if take > int32(cc.maxFrameSize) {
2022 take = int32(cc.maxFrameSize)
2023 }
2024 cs.flow.take(take)
2025 return take, nil
2026 }
2027 cc.cond.Wait()
2028 }
2029}
2030
2031// requires cc.wmu be held.
2032func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
2033 cc.hbuf.Reset()
2034
2035 hlSize := uint64(0)
2036 for k, vv := range trailer {
2037 for _, v := range vv {
2038 hf := hpack.HeaderField{Name: k, Value: v}
2039 hlSize += uint64(hf.Size())
2040 }
2041 }
2042 if hlSize > cc.peerMaxHeaderListSize {
2043 return nil, errRequestHeaderListSize
2044 }
2045
2046 for k, vv := range trailer {
2047 lowKey, ascii := httpcommon.LowerHeader(k)
2048 if !ascii {
2049 // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
2050 // field names have to be ASCII characters (just as in HTTP/1.x).
2051 continue
2052 }
2053 // Transfer-Encoding, etc.. have already been filtered at the
2054 // start of RoundTrip
2055 for _, v := range vv {
2056 cc.writeHeader(lowKey, v)
2057 }
2058 }
2059 return cc.hbuf.Bytes(), nil
2060}
2061
2062func (cc *ClientConn) writeHeader(name, value string) {
2063 if VerboseLogs {
2064 log.Printf("http2: Transport encoding header %q = %q", name, value)
2065 }
2066 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2067}
2068
2069type resAndError struct {
2070 _ incomparable
2071 res *http.Response
2072 err error
2073}
2074
2075// requires cc.mu be held.
2076func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2077 cs.flow.add(int32(cc.initialWindowSize))
2078 cs.flow.setConnFlow(&cc.flow)
2079 cs.inflow.init(cc.initialStreamRecvWindowSize)
2080 cs.ID = cc.nextStreamID
2081 cc.nextStreamID += 2
2082 cc.streams[cs.ID] = cs
2083 if cs.ID == 0 {
2084 panic("assigned stream ID 0")
2085 }
2086}
2087
2088func (cc *ClientConn) forgetStreamID(id uint32) {
2089 cc.mu.Lock()
2090 slen := len(cc.streams)
2091 delete(cc.streams, id)
2092 if len(cc.streams) != slen-1 {
2093 panic("forgetting unknown stream id")
2094 }
2095 cc.lastActive = cc.t.now()
2096 if len(cc.streams) == 0 && cc.idleTimer != nil {
2097 cc.idleTimer.Reset(cc.idleTimeout)
2098 cc.lastIdle = cc.t.now()
2099 }
2100 // Wake up writeRequestBody via clientStream.awaitFlowControl and
2101 // wake up RoundTrip if there is a pending request.
2102 cc.cond.Broadcast()
2103
2104 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2105 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2106 if VerboseLogs {
2107 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2108 }
2109 cc.closed = true
2110 defer cc.closeConn()
2111 }
2112
2113 cc.mu.Unlock()
2114}
2115
2116// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
2117type clientConnReadLoop struct {
2118 _ incomparable
2119 cc *ClientConn
2120}
2121
2122// readLoop runs in its own goroutine and reads and dispatches frames.
2123func (cc *ClientConn) readLoop() {
2124 cc.t.markNewGoroutine()
2125 rl := &clientConnReadLoop{cc: cc}
2126 defer rl.cleanup()
2127 cc.readerErr = rl.run()
2128 if ce, ok := cc.readerErr.(ConnectionError); ok {
2129 cc.wmu.Lock()
2130 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2131 cc.wmu.Unlock()
2132 }
2133}
2134
2135// GoAwayError is returned by the Transport when the server closes the
2136// TCP connection after sending a GOAWAY frame.
2137type GoAwayError struct {
2138 LastStreamID uint32
2139 ErrCode ErrCode
2140 DebugData string
2141}
2142
2143func (e GoAwayError) Error() string {
2144 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2145 e.LastStreamID, e.ErrCode, e.DebugData)
2146}
2147
2148func isEOFOrNetReadError(err error) bool {
2149 if err == io.EOF {
2150 return true
2151 }
2152 ne, ok := err.(*net.OpError)
2153 return ok && ne.Op == "read"
2154}
2155
2156func (rl *clientConnReadLoop) cleanup() {
2157 cc := rl.cc
2158 defer cc.closeConn()
2159 defer close(cc.readerDone)
2160
2161 if cc.idleTimer != nil {
2162 cc.idleTimer.Stop()
2163 }
2164
2165 // Close any response bodies if the server closes prematurely.
2166 // TODO: also do this if we've written the headers but not
2167 // gotten a response yet.
2168 err := cc.readerErr
2169 cc.mu.Lock()
2170 if cc.goAway != nil && isEOFOrNetReadError(err) {
2171 err = GoAwayError{
2172 LastStreamID: cc.goAway.LastStreamID,
2173 ErrCode: cc.goAway.ErrCode,
2174 DebugData: cc.goAwayDebug,
2175 }
2176 } else if err == io.EOF {
2177 err = io.ErrUnexpectedEOF
2178 }
2179 cc.closed = true
2180
2181 // If the connection has never been used, and has been open for only a short time,
2182 // leave it in the connection pool for a little while.
2183 //
2184 // This avoids a situation where new connections are constantly created,
2185 // added to the pool, fail, and are removed from the pool, without any error
2186 // being surfaced to the user.
2187 unusedWaitTime := 5 * time.Second
2188 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2189 unusedWaitTime = cc.idleTimeout
2190 }
2191 idleTime := cc.t.now().Sub(cc.lastActive)
2192 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2193 cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
2194 cc.t.connPool().MarkDead(cc)
2195 })
2196 } else {
2197 cc.mu.Unlock() // avoid any deadlocks in MarkDead
2198 cc.t.connPool().MarkDead(cc)
2199 cc.mu.Lock()
2200 }
2201
2202 for _, cs := range cc.streams {
2203 select {
2204 case <-cs.peerClosed:
2205 // The server closed the stream before closing the conn,
2206 // so no need to interrupt it.
2207 default:
2208 cs.abortStreamLocked(err)
2209 }
2210 }
2211 cc.cond.Broadcast()
2212 cc.mu.Unlock()
2213
2214 if !cc.seenSettings {
2215 // If we have a pending request that wants extended CONNECT,
2216 // let it continue and fail with the connection error.
2217 cc.extendedConnectAllowed = true
2218 close(cc.seenSettingsChan)
2219 }
2220}
2221
2222// countReadFrameError calls Transport.CountError with a string
2223// representing err.
2224func (cc *ClientConn) countReadFrameError(err error) {
2225 f := cc.t.CountError
2226 if f == nil || err == nil {
2227 return
2228 }
2229 if ce, ok := err.(ConnectionError); ok {
2230 errCode := ErrCode(ce)
2231 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2232 return
2233 }
2234 if errors.Is(err, io.EOF) {
2235 f("read_frame_eof")
2236 return
2237 }
2238 if errors.Is(err, io.ErrUnexpectedEOF) {
2239 f("read_frame_unexpected_eof")
2240 return
2241 }
2242 if errors.Is(err, ErrFrameTooLarge) {
2243 f("read_frame_too_large")
2244 return
2245 }
2246 f("read_frame_other")
2247}
2248
2249func (rl *clientConnReadLoop) run() error {
2250 cc := rl.cc
2251 gotSettings := false
2252 readIdleTimeout := cc.readIdleTimeout
2253 var t timer
2254 if readIdleTimeout != 0 {
2255 t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
2256 }
2257 for {
2258 f, err := cc.fr.ReadFrame()
2259 if t != nil {
2260 t.Reset(readIdleTimeout)
2261 }
2262 if err != nil {
2263 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2264 }
2265 if se, ok := err.(StreamError); ok {
2266 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2267 if se.Cause == nil {
2268 se.Cause = cc.fr.errDetail
2269 }
2270 rl.endStreamError(cs, se)
2271 }
2272 continue
2273 } else if err != nil {
2274 cc.countReadFrameError(err)
2275 return err
2276 }
2277 if VerboseLogs {
2278 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2279 }
2280 if !gotSettings {
2281 if _, ok := f.(*SettingsFrame); !ok {
2282 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2283 return ConnectionError(ErrCodeProtocol)
2284 }
2285 gotSettings = true
2286 }
2287
2288 switch f := f.(type) {
2289 case *MetaHeadersFrame:
2290 err = rl.processHeaders(f)
2291 case *DataFrame:
2292 err = rl.processData(f)
2293 case *GoAwayFrame:
2294 err = rl.processGoAway(f)
2295 case *RSTStreamFrame:
2296 err = rl.processResetStream(f)
2297 case *SettingsFrame:
2298 err = rl.processSettings(f)
2299 case *PushPromiseFrame:
2300 err = rl.processPushPromise(f)
2301 case *WindowUpdateFrame:
2302 err = rl.processWindowUpdate(f)
2303 case *PingFrame:
2304 err = rl.processPing(f)
2305 default:
2306 cc.logf("Transport: unhandled response frame type %T", f)
2307 }
2308 if err != nil {
2309 if VerboseLogs {
2310 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2311 }
2312 return err
2313 }
2314 }
2315}
2316
2317func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2318 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2319 if cs == nil {
2320 // We'd get here if we canceled a request while the
2321 // server had its response still in flight. So if this
2322 // was just something we canceled, ignore it.
2323 return nil
2324 }
2325 if cs.readClosed {
2326 rl.endStreamError(cs, StreamError{
2327 StreamID: f.StreamID,
2328 Code: ErrCodeProtocol,
2329 Cause: errors.New("protocol error: headers after END_STREAM"),
2330 })
2331 return nil
2332 }
2333 if !cs.firstByte {
2334 if cs.trace != nil {
2335 // TODO(bradfitz): move first response byte earlier,
2336 // when we first read the 9 byte header, not waiting
2337 // until all the HEADERS+CONTINUATION frames have been
2338 // merged. This works for now.
2339 traceFirstResponseByte(cs.trace)
2340 }
2341 cs.firstByte = true
2342 }
2343 if !cs.pastHeaders {
2344 cs.pastHeaders = true
2345 } else {
2346 return rl.processTrailers(cs, f)
2347 }
2348
2349 res, err := rl.handleResponse(cs, f)
2350 if err != nil {
2351 if _, ok := err.(ConnectionError); ok {
2352 return err
2353 }
2354 // Any other error type is a stream error.
2355 rl.endStreamError(cs, StreamError{
2356 StreamID: f.StreamID,
2357 Code: ErrCodeProtocol,
2358 Cause: err,
2359 })
2360 return nil // return nil from process* funcs to keep conn alive
2361 }
2362 if res == nil {
2363 // (nil, nil) special case. See handleResponse docs.
2364 return nil
2365 }
2366 cs.resTrailer = &res.Trailer
2367 cs.res = res
2368 close(cs.respHeaderRecv)
2369 if f.StreamEnded() {
2370 rl.endStream(cs)
2371 }
2372 return nil
2373}
2374
2375// may return error types nil, or ConnectionError. Any other error value
2376// is a StreamError of type ErrCodeProtocol. The returned error in that case
2377// is the detail.
2378//
2379// As a special case, handleResponse may return (nil, nil) to skip the
2380// frame (currently only used for 1xx responses).
2381func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
2382 if f.Truncated {
2383 return nil, errResponseHeaderListSize
2384 }
2385
2386 status := f.PseudoValue("status")
2387 if status == "" {
2388 return nil, errors.New("malformed response from server: missing status pseudo header")
2389 }
2390 statusCode, err := strconv.Atoi(status)
2391 if err != nil {
2392 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2393 }
2394
2395 regularFields := f.RegularFields()
2396 strs := make([]string, len(regularFields))
2397 header := make(http.Header, len(regularFields))
2398 res := &http.Response{
2399 Proto: "HTTP/2.0",
2400 ProtoMajor: 2,
2401 Header: header,
2402 StatusCode: statusCode,
2403 Status: status + " " + http.StatusText(statusCode),
2404 }
2405 for _, hf := range regularFields {
2406 key := httpcommon.CanonicalHeader(hf.Name)
2407 if key == "Trailer" {
2408 t := res.Trailer
2409 if t == nil {
2410 t = make(http.Header)
2411 res.Trailer = t
2412 }
2413 foreachHeaderElement(hf.Value, func(v string) {
2414 t[httpcommon.CanonicalHeader(v)] = nil
2415 })
2416 } else {
2417 vv := header[key]
2418 if vv == nil && len(strs) > 0 {
2419 // More than likely this will be a single-element key.
2420 // Most headers aren't multi-valued.
2421 // Set the capacity on strs[0] to 1, so any future append
2422 // won't extend the slice into the other strings.
2423 vv, strs = strs[:1:1], strs[1:]
2424 vv[0] = hf.Value
2425 header[key] = vv
2426 } else {
2427 header[key] = append(vv, hf.Value)
2428 }
2429 }
2430 }
2431
2432 if statusCode >= 100 && statusCode <= 199 {
2433 if f.StreamEnded() {
2434 return nil, errors.New("1xx informational response with END_STREAM flag")
2435 }
2436 if fn := cs.get1xxTraceFunc(); fn != nil {
2437 // If the 1xx response is being delivered to the user,
2438 // then they're responsible for limiting the number
2439 // of responses.
2440 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2441 return nil, err
2442 }
2443 } else {
2444 // If the user didn't examine the 1xx response, then we
2445 // limit the size of all 1xx headers.
2446 //
2447 // This differs a bit from the HTTP/1 implementation, which
2448 // limits the size of all 1xx headers plus the final response.
2449 // Use the larger limit of MaxHeaderListSize and
2450 // net/http.Transport.MaxResponseHeaderBytes.
2451 limit := int64(cs.cc.t.maxHeaderListSize())
2452 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit {
2453 limit = t1.MaxResponseHeaderBytes
2454 }
2455 for _, h := range f.Fields {
2456 cs.totalHeaderSize += int64(h.Size())
2457 }
2458 if cs.totalHeaderSize > limit {
2459 if VerboseLogs {
2460 log.Printf("http2: 1xx informational responses too large")
2461 }
2462 return nil, errors.New("header list too large")
2463 }
2464 }
2465 if statusCode == 100 {
2466 traceGot100Continue(cs.trace)
2467 select {
2468 case cs.on100 <- struct{}{}:
2469 default:
2470 }
2471 }
2472 cs.pastHeaders = false // do it all again
2473 return nil, nil
2474 }
2475
2476 res.ContentLength = -1
2477 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2478 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2479 res.ContentLength = int64(cl)
2480 } else {
2481 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2482 // more safe smuggling-wise to ignore.
2483 }
2484 } else if len(clens) > 1 {
2485 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2486 // more safe smuggling-wise to ignore.
2487 } else if f.StreamEnded() && !cs.isHead {
2488 res.ContentLength = 0
2489 }
2490
2491 if cs.isHead {
2492 res.Body = noBody
2493 return res, nil
2494 }
2495
2496 if f.StreamEnded() {
2497 if res.ContentLength > 0 {
2498 res.Body = missingBody{}
2499 } else {
2500 res.Body = noBody
2501 }
2502 return res, nil
2503 }
2504
2505 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2506 cs.bytesRemain = res.ContentLength
2507 res.Body = transportResponseBody{cs}
2508
2509 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2510 res.Header.Del("Content-Encoding")
2511 res.Header.Del("Content-Length")
2512 res.ContentLength = -1
2513 res.Body = &gzipReader{body: res.Body}
2514 res.Uncompressed = true
2515 }
2516 return res, nil
2517}
2518
2519func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2520 if cs.pastTrailers {
2521 // Too many HEADERS frames for this stream.
2522 return ConnectionError(ErrCodeProtocol)
2523 }
2524 cs.pastTrailers = true
2525 if !f.StreamEnded() {
2526 // We expect that any headers for trailers also
2527 // has END_STREAM.
2528 return ConnectionError(ErrCodeProtocol)
2529 }
2530 if len(f.PseudoFields()) > 0 {
2531 // No pseudo header fields are defined for trailers.
2532 // TODO: ConnectionError might be overly harsh? Check.
2533 return ConnectionError(ErrCodeProtocol)
2534 }
2535
2536 trailer := make(http.Header)
2537 for _, hf := range f.RegularFields() {
2538 key := httpcommon.CanonicalHeader(hf.Name)
2539 trailer[key] = append(trailer[key], hf.Value)
2540 }
2541 cs.trailer = trailer
2542
2543 rl.endStream(cs)
2544 return nil
2545}
2546
2547// transportResponseBody is the concrete type of Transport.RoundTrip's
2548// Response.Body. It is an io.ReadCloser.
2549type transportResponseBody struct {
2550 cs *clientStream
2551}
2552
2553func (b transportResponseBody) Read(p []byte) (n int, err error) {
2554 cs := b.cs
2555 cc := cs.cc
2556
2557 if cs.readErr != nil {
2558 return 0, cs.readErr
2559 }
2560 n, err = b.cs.bufPipe.Read(p)
2561 if cs.bytesRemain != -1 {
2562 if int64(n) > cs.bytesRemain {
2563 n = int(cs.bytesRemain)
2564 if err == nil {
2565 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2566 cs.abortStream(err)
2567 }
2568 cs.readErr = err
2569 return int(cs.bytesRemain), err
2570 }
2571 cs.bytesRemain -= int64(n)
2572 if err == io.EOF && cs.bytesRemain > 0 {
2573 err = io.ErrUnexpectedEOF
2574 cs.readErr = err
2575 return n, err
2576 }
2577 }
2578 if n == 0 {
2579 // No flow control tokens to send back.
2580 return
2581 }
2582
2583 cc.mu.Lock()
2584 connAdd := cc.inflow.add(n)
2585 var streamAdd int32
2586 if err == nil { // No need to refresh if the stream is over or failed.
2587 streamAdd = cs.inflow.add(n)
2588 }
2589 cc.mu.Unlock()
2590
2591 if connAdd != 0 || streamAdd != 0 {
2592 cc.wmu.Lock()
2593 defer cc.wmu.Unlock()
2594 if connAdd != 0 {
2595 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2596 }
2597 if streamAdd != 0 {
2598 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2599 }
2600 cc.bw.Flush()
2601 }
2602 return
2603}
2604
2605var errClosedResponseBody = errors.New("http2: response body closed")
2606
2607func (b transportResponseBody) Close() error {
2608 cs := b.cs
2609 cc := cs.cc
2610
2611 cs.bufPipe.BreakWithError(errClosedResponseBody)
2612 cs.abortStream(errClosedResponseBody)
2613
2614 unread := cs.bufPipe.Len()
2615 if unread > 0 {
2616 cc.mu.Lock()
2617 // Return connection-level flow control.
2618 connAdd := cc.inflow.add(unread)
2619 cc.mu.Unlock()
2620
2621 // TODO(dneil): Acquiring this mutex can block indefinitely.
2622 // Move flow control return to a goroutine?
2623 cc.wmu.Lock()
2624 // Return connection-level flow control.
2625 if connAdd > 0 {
2626 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2627 }
2628 cc.bw.Flush()
2629 cc.wmu.Unlock()
2630 }
2631
2632 select {
2633 case <-cs.donec:
2634 case <-cs.ctx.Done():
2635 // See golang/go#49366: The net/http package can cancel the
2636 // request context after the response body is fully read.
2637 // Don't treat this as an error.
2638 return nil
2639 case <-cs.reqCancel:
2640 return errRequestCanceled
2641 }
2642 return nil
2643}
2644
2645func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2646 cc := rl.cc
2647 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2648 data := f.Data()
2649 if cs == nil {
2650 cc.mu.Lock()
2651 neverSent := cc.nextStreamID
2652 cc.mu.Unlock()
2653 if f.StreamID >= neverSent {
2654 // We never asked for this.
2655 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2656 return ConnectionError(ErrCodeProtocol)
2657 }
2658 // We probably did ask for this, but canceled. Just ignore it.
2659 // TODO: be stricter here? only silently ignore things which
2660 // we canceled, but not things which were closed normally
2661 // by the peer? Tough without accumulating too much state.
2662
2663 // But at least return their flow control:
2664 if f.Length > 0 {
2665 cc.mu.Lock()
2666 ok := cc.inflow.take(f.Length)
2667 connAdd := cc.inflow.add(int(f.Length))
2668 cc.mu.Unlock()
2669 if !ok {
2670 return ConnectionError(ErrCodeFlowControl)
2671 }
2672 if connAdd > 0 {
2673 cc.wmu.Lock()
2674 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2675 cc.bw.Flush()
2676 cc.wmu.Unlock()
2677 }
2678 }
2679 return nil
2680 }
2681 if cs.readClosed {
2682 cc.logf("protocol error: received DATA after END_STREAM")
2683 rl.endStreamError(cs, StreamError{
2684 StreamID: f.StreamID,
2685 Code: ErrCodeProtocol,
2686 })
2687 return nil
2688 }
2689 if !cs.pastHeaders {
2690 cc.logf("protocol error: received DATA before a HEADERS frame")
2691 rl.endStreamError(cs, StreamError{
2692 StreamID: f.StreamID,
2693 Code: ErrCodeProtocol,
2694 })
2695 return nil
2696 }
2697 if f.Length > 0 {
2698 if cs.isHead && len(data) > 0 {
2699 cc.logf("protocol error: received DATA on a HEAD request")
2700 rl.endStreamError(cs, StreamError{
2701 StreamID: f.StreamID,
2702 Code: ErrCodeProtocol,
2703 })
2704 return nil
2705 }
2706 // Check connection-level flow control.
2707 cc.mu.Lock()
2708 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2709 cc.mu.Unlock()
2710 return ConnectionError(ErrCodeFlowControl)
2711 }
2712 // Return any padded flow control now, since we won't
2713 // refund it later on body reads.
2714 var refund int
2715 if pad := int(f.Length) - len(data); pad > 0 {
2716 refund += pad
2717 }
2718
2719 didReset := false
2720 var err error
2721 if len(data) > 0 {
2722 if _, err = cs.bufPipe.Write(data); err != nil {
2723 // Return len(data) now if the stream is already closed,
2724 // since data will never be read.
2725 didReset = true
2726 refund += len(data)
2727 }
2728 }
2729
2730 sendConn := cc.inflow.add(refund)
2731 var sendStream int32
2732 if !didReset {
2733 sendStream = cs.inflow.add(refund)
2734 }
2735 cc.mu.Unlock()
2736
2737 if sendConn > 0 || sendStream > 0 {
2738 cc.wmu.Lock()
2739 if sendConn > 0 {
2740 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2741 }
2742 if sendStream > 0 {
2743 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2744 }
2745 cc.bw.Flush()
2746 cc.wmu.Unlock()
2747 }
2748
2749 if err != nil {
2750 rl.endStreamError(cs, err)
2751 return nil
2752 }
2753 }
2754
2755 if f.StreamEnded() {
2756 rl.endStream(cs)
2757 }
2758 return nil
2759}
2760
2761func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2762 // TODO: check that any declared content-length matches, like
2763 // server.go's (*stream).endStream method.
2764 if !cs.readClosed {
2765 cs.readClosed = true
2766 // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
2767 // race condition: The caller can read io.EOF from Response.Body
2768 // and close the body before we close cs.peerClosed, causing
2769 // cleanupWriteRequest to send a RST_STREAM.
2770 rl.cc.mu.Lock()
2771 defer rl.cc.mu.Unlock()
2772 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2773 close(cs.peerClosed)
2774 }
2775}
2776
2777func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2778 cs.readAborted = true
2779 cs.abortStream(err)
2780}
2781
2782// Constants passed to streamByID for documentation purposes.
2783const (
2784 headerOrDataFrame = true
2785 notHeaderOrDataFrame = false
2786)
2787
2788// streamByID returns the stream with the given id, or nil if no stream has that id.
2789// If headerOrData is true, it clears rst.StreamPingsBlocked.
2790func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2791 rl.cc.mu.Lock()
2792 defer rl.cc.mu.Unlock()
2793 if headerOrData {
2794 // Work around an unfortunate gRPC behavior.
2795 // See comment on ClientConn.rstStreamPingsBlocked for details.
2796 rl.cc.rstStreamPingsBlocked = false
2797 }
2798 cs := rl.cc.streams[id]
2799 if cs != nil && !cs.readAborted {
2800 return cs
2801 }
2802 return nil
2803}
2804
2805func (cs *clientStream) copyTrailers() {
2806 for k, vv := range cs.trailer {
2807 t := cs.resTrailer
2808 if *t == nil {
2809 *t = make(http.Header)
2810 }
2811 (*t)[k] = vv
2812 }
2813}
2814
2815func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2816 cc := rl.cc
2817 cc.t.connPool().MarkDead(cc)
2818 if f.ErrCode != 0 {
2819 // TODO: deal with GOAWAY more. particularly the error code
2820 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2821 if fn := cc.t.CountError; fn != nil {
2822 fn("recv_goaway_" + f.ErrCode.stringToken())
2823 }
2824 }
2825 cc.setGoAway(f)
2826 return nil
2827}
2828
2829func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2830 cc := rl.cc
2831 // Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
2832 // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
2833 cc.wmu.Lock()
2834 defer cc.wmu.Unlock()
2835
2836 if err := rl.processSettingsNoWrite(f); err != nil {
2837 return err
2838 }
2839 if !f.IsAck() {
2840 cc.fr.WriteSettingsAck()
2841 cc.bw.Flush()
2842 }
2843 return nil
2844}
2845
2846func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2847 cc := rl.cc
2848 cc.mu.Lock()
2849 defer cc.mu.Unlock()
2850
2851 if f.IsAck() {
2852 if cc.wantSettingsAck {
2853 cc.wantSettingsAck = false
2854 return nil
2855 }
2856 return ConnectionError(ErrCodeProtocol)
2857 }
2858
2859 var seenMaxConcurrentStreams bool
2860 err := f.ForeachSetting(func(s Setting) error {
2861 switch s.ID {
2862 case SettingMaxFrameSize:
2863 cc.maxFrameSize = s.Val
2864 case SettingMaxConcurrentStreams:
2865 cc.maxConcurrentStreams = s.Val
2866 seenMaxConcurrentStreams = true
2867 case SettingMaxHeaderListSize:
2868 cc.peerMaxHeaderListSize = uint64(s.Val)
2869 case SettingInitialWindowSize:
2870 // Values above the maximum flow-control
2871 // window size of 2^31-1 MUST be treated as a
2872 // connection error (Section 5.4.1) of type
2873 // FLOW_CONTROL_ERROR.
2874 if s.Val > math.MaxInt32 {
2875 return ConnectionError(ErrCodeFlowControl)
2876 }
2877
2878 // Adjust flow control of currently-open
2879 // frames by the difference of the old initial
2880 // window size and this one.
2881 delta := int32(s.Val) - int32(cc.initialWindowSize)
2882 for _, cs := range cc.streams {
2883 cs.flow.add(delta)
2884 }
2885 cc.cond.Broadcast()
2886
2887 cc.initialWindowSize = s.Val
2888 case SettingHeaderTableSize:
2889 cc.henc.SetMaxDynamicTableSize(s.Val)
2890 cc.peerMaxHeaderTableSize = s.Val
2891 case SettingEnableConnectProtocol:
2892 if err := s.Valid(); err != nil {
2893 return err
2894 }
2895 // If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
2896 // we require that it do so in the first SETTINGS frame.
2897 //
2898 // When we attempt to use extended CONNECT, we wait for the first
2899 // SETTINGS frame to see if the server supports it. If we let the
2900 // server enable the feature with a later SETTINGS frame, then
2901 // users will see inconsistent results depending on whether we've
2902 // seen that frame or not.
2903 if !cc.seenSettings {
2904 cc.extendedConnectAllowed = s.Val == 1
2905 }
2906 default:
2907 cc.vlogf("Unhandled Setting: %v", s)
2908 }
2909 return nil
2910 })
2911 if err != nil {
2912 return err
2913 }
2914
2915 if !cc.seenSettings {
2916 if !seenMaxConcurrentStreams {
2917 // This was the servers initial SETTINGS frame and it
2918 // didn't contain a MAX_CONCURRENT_STREAMS field so
2919 // increase the number of concurrent streams this
2920 // connection can establish to our default.
2921 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2922 }
2923 close(cc.seenSettingsChan)
2924 cc.seenSettings = true
2925 }
2926
2927 return nil
2928}
2929
2930func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2931 cc := rl.cc
2932 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2933 if f.StreamID != 0 && cs == nil {
2934 return nil
2935 }
2936
2937 cc.mu.Lock()
2938 defer cc.mu.Unlock()
2939
2940 fl := &cc.flow
2941 if cs != nil {
2942 fl = &cs.flow
2943 }
2944 if !fl.add(int32(f.Increment)) {
2945 // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
2946 if cs != nil {
2947 rl.endStreamError(cs, StreamError{
2948 StreamID: f.StreamID,
2949 Code: ErrCodeFlowControl,
2950 })
2951 return nil
2952 }
2953
2954 return ConnectionError(ErrCodeFlowControl)
2955 }
2956 cc.cond.Broadcast()
2957 return nil
2958}
2959
2960func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2961 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2962 if cs == nil {
2963 // TODO: return error if server tries to RST_STREAM an idle stream
2964 return nil
2965 }
2966 serr := streamError(cs.ID, f.ErrCode)
2967 serr.Cause = errFromPeer
2968 if f.ErrCode == ErrCodeProtocol {
2969 rl.cc.SetDoNotReuse()
2970 }
2971 if fn := cs.cc.t.CountError; fn != nil {
2972 fn("recv_rststream_" + f.ErrCode.stringToken())
2973 }
2974 cs.abortStream(serr)
2975
2976 cs.bufPipe.CloseWithError(serr)
2977 return nil
2978}
2979
2980// Ping sends a PING frame to the server and waits for the ack.
2981func (cc *ClientConn) Ping(ctx context.Context) error {
2982 c := make(chan struct{})
2983 // Generate a random payload
2984 var p [8]byte
2985 for {
2986 if _, err := rand.Read(p[:]); err != nil {
2987 return err
2988 }
2989 cc.mu.Lock()
2990 // check for dup before insert
2991 if _, found := cc.pings[p]; !found {
2992 cc.pings[p] = c
2993 cc.mu.Unlock()
2994 break
2995 }
2996 cc.mu.Unlock()
2997 }
2998 var pingError error
2999 errc := make(chan struct{})
3000 go func() {
3001 cc.t.markNewGoroutine()
3002 cc.wmu.Lock()
3003 defer cc.wmu.Unlock()
3004 if pingError = cc.fr.WritePing(false, p); pingError != nil {
3005 close(errc)
3006 return
3007 }
3008 if pingError = cc.bw.Flush(); pingError != nil {
3009 close(errc)
3010 return
3011 }
3012 }()
3013 select {
3014 case <-c:
3015 return nil
3016 case <-errc:
3017 return pingError
3018 case <-ctx.Done():
3019 return ctx.Err()
3020 case <-cc.readerDone:
3021 // connection closed
3022 return cc.readerErr
3023 }
3024}
3025
3026func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
3027 if f.IsAck() {
3028 cc := rl.cc
3029 cc.mu.Lock()
3030 defer cc.mu.Unlock()
3031 // If ack, notify listener if any
3032 if c, ok := cc.pings[f.Data]; ok {
3033 close(c)
3034 delete(cc.pings, f.Data)
3035 }
3036 if cc.pendingResets > 0 {
3037 // See clientStream.cleanupWriteRequest.
3038 cc.pendingResets = 0
3039 cc.rstStreamPingsBlocked = true
3040 cc.cond.Broadcast()
3041 }
3042 return nil
3043 }
3044 cc := rl.cc
3045 cc.wmu.Lock()
3046 defer cc.wmu.Unlock()
3047 if err := cc.fr.WritePing(true, f.Data); err != nil {
3048 return err
3049 }
3050 return cc.bw.Flush()
3051}
3052
3053func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3054 // We told the peer we don't want them.
3055 // Spec says:
3056 // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
3057 // setting of the peer endpoint is set to 0. An endpoint that
3058 // has set this setting and has received acknowledgement MUST
3059 // treat the receipt of a PUSH_PROMISE frame as a connection
3060 // error (Section 5.4.1) of type PROTOCOL_ERROR."
3061 return ConnectionError(ErrCodeProtocol)
3062}
3063
3064// writeStreamReset sends a RST_STREAM frame.
3065// When ping is true, it also sends a PING frame with a random payload.
3066func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
3067 // TODO: map err to more interesting error codes, once the
3068 // HTTP community comes up with some. But currently for
3069 // RST_STREAM there's no equivalent to GOAWAY frame's debug
3070 // data, and the error codes are all pretty vague ("cancel").
3071 cc.wmu.Lock()
3072 cc.fr.WriteRSTStream(streamID, code)
3073 if ping {
3074 var payload [8]byte
3075 rand.Read(payload[:])
3076 cc.fr.WritePing(false, payload)
3077 }
3078 cc.bw.Flush()
3079 cc.wmu.Unlock()
3080}
3081
3082var (
3083 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3084 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
3085)
3086
3087func (cc *ClientConn) logf(format string, args ...interface{}) {
3088 cc.t.logf(format, args...)
3089}
3090
3091func (cc *ClientConn) vlogf(format string, args ...interface{}) {
3092 cc.t.vlogf(format, args...)
3093}
3094
3095func (t *Transport) vlogf(format string, args ...interface{}) {
3096 if VerboseLogs {
3097 t.logf(format, args...)
3098 }
3099}
3100
3101func (t *Transport) logf(format string, args ...interface{}) {
3102 log.Printf(format, args...)
3103}
3104
3105var noBody io.ReadCloser = noBodyReader{}
3106
3107type noBodyReader struct{}
3108
3109func (noBodyReader) Close() error { return nil }
3110func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
3111
3112type missingBody struct{}
3113
3114func (missingBody) Close() error { return nil }
3115func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3116
3117func strSliceContains(ss []string, s string) bool {
3118 for _, v := range ss {
3119 if v == s {
3120 return true
3121 }
3122 }
3123 return false
3124}
3125
3126type erringRoundTripper struct{ err error }
3127
3128func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3129func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
3130
3131// gzipReader wraps a response body so it can lazily
3132// call gzip.NewReader on the first call to Read
3133type gzipReader struct {
3134 _ incomparable
3135 body io.ReadCloser // underlying Response.Body
3136 zr *gzip.Reader // lazily-initialized gzip reader
3137 zerr error // sticky error
3138}
3139
3140func (gz *gzipReader) Read(p []byte) (n int, err error) {
3141 if gz.zerr != nil {
3142 return 0, gz.zerr
3143 }
3144 if gz.zr == nil {
3145 gz.zr, err = gzip.NewReader(gz.body)
3146 if err != nil {
3147 gz.zerr = err
3148 return 0, err
3149 }
3150 }
3151 return gz.zr.Read(p)
3152}
3153
3154func (gz *gzipReader) Close() error {
3155 if err := gz.body.Close(); err != nil {
3156 return err
3157 }
3158 gz.zerr = fs.ErrClosed
3159 return nil
3160}
3161
3162type errorReader struct{ err error }
3163
3164func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
3165
3166// isConnectionCloseRequest reports whether req should use its own
3167// connection for a single request and then close the connection.
3168func isConnectionCloseRequest(req *http.Request) bool {
3169 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3170}
3171
3172// registerHTTPSProtocol calls Transport.RegisterProtocol but
3173// converting panics into errors.
3174func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
3175 defer func() {
3176 if e := recover(); e != nil {
3177 err = fmt.Errorf("%v", e)
3178 }
3179 }()
3180 t.RegisterProtocol("https", rt)
3181 return nil
3182}
3183
3184// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
3185// if there's already has a cached connection to the host.
3186// (The field is exported so it can be accessed via reflect from net/http; tested
3187// by TestNoDialH2RoundTripperType)
3188type noDialH2RoundTripper struct{ *Transport }
3189
3190func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
3191 res, err := rt.Transport.RoundTrip(req)
3192 if isNoCachedConnError(err) {
3193 return nil, http.ErrSkipAltProtocol
3194 }
3195 return res, err
3196}
3197
3198func (t *Transport) idleConnTimeout() time.Duration {
3199 // to keep things backwards compatible, we use non-zero values of
3200 // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
3201 // http1 transport, followed by 0
3202 if t.IdleConnTimeout != 0 {
3203 return t.IdleConnTimeout
3204 }
3205
3206 if t.t1 != nil {
3207 return t.t1.IdleConnTimeout
3208 }
3209
3210 return 0
3211}
3212
3213func traceGetConn(req *http.Request, hostPort string) {
3214 trace := httptrace.ContextClientTrace(req.Context())
3215 if trace == nil || trace.GetConn == nil {
3216 return
3217 }
3218 trace.GetConn(hostPort)
3219}
3220
3221func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
3222 trace := httptrace.ContextClientTrace(req.Context())
3223 if trace == nil || trace.GotConn == nil {
3224 return
3225 }
3226 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3227 ci.Reused = reused
3228 cc.mu.Lock()
3229 ci.WasIdle = len(cc.streams) == 0 && reused
3230 if ci.WasIdle && !cc.lastActive.IsZero() {
3231 ci.IdleTime = cc.t.timeSince(cc.lastActive)
3232 }
3233 cc.mu.Unlock()
3234
3235 trace.GotConn(ci)
3236}
3237
3238func traceWroteHeaders(trace *httptrace.ClientTrace) {
3239 if trace != nil && trace.WroteHeaders != nil {
3240 trace.WroteHeaders()
3241 }
3242}
3243
3244func traceGot100Continue(trace *httptrace.ClientTrace) {
3245 if trace != nil && trace.Got100Continue != nil {
3246 trace.Got100Continue()
3247 }
3248}
3249
3250func traceWait100Continue(trace *httptrace.ClientTrace) {
3251 if trace != nil && trace.Wait100Continue != nil {
3252 trace.Wait100Continue()
3253 }
3254}
3255
3256func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3257 if trace != nil && trace.WroteRequest != nil {
3258 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3259 }
3260}
3261
3262func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3263 if trace != nil && trace.GotFirstResponseByte != nil {
3264 trace.GotFirstResponseByte()
3265 }
3266}
3267
3268func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3269 if trace != nil {
3270 return trace.Got1xxResponse
3271 }
3272 return nil
3273}
3274
3275// dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
3276// connection.
3277func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3278 dialer := &tls.Dialer{
3279 Config: cfg,
3280 }
3281 cn, err := dialer.DialContext(ctx, network, addr)
3282 if err != nil {
3283 return nil, err
3284 }
3285 tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
3286 return tlsCn, nil
3287}