client_conn_pool.go

  1// Copyright 2015 The Go Authors. All rights reserved.
  2// Use of this source code is governed by a BSD-style
  3// license that can be found in the LICENSE file.
  4
  5// Transport code's client connection pooling.
  6
  7package http2
  8
  9import (
 10	"context"
 11	"errors"
 12	"net"
 13	"net/http"
 14	"sync"
 15)
 16
 17// ClientConnPool manages a pool of HTTP/2 client connections.
 18type ClientConnPool interface {
 19	// GetClientConn returns a specific HTTP/2 connection (usually
 20	// a TLS-TCP connection) to an HTTP/2 server. On success, the
 21	// returned ClientConn accounts for the upcoming RoundTrip
 22	// call, so the caller should not omit it. If the caller needs
 23	// to, ClientConn.RoundTrip can be called with a bogus
 24	// new(http.Request) to release the stream reservation.
 25	GetClientConn(req *http.Request, addr string) (*ClientConn, error)
 26	MarkDead(*ClientConn)
 27}
 28
 29// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
 30// implementations which can close their idle connections.
 31type clientConnPoolIdleCloser interface {
 32	ClientConnPool
 33	closeIdleConnections()
 34}
 35
 36var (
 37	_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
 38	_ clientConnPoolIdleCloser = noDialClientConnPool{}
 39)
 40
 41// TODO: use singleflight for dialing and addConnCalls?
 42type clientConnPool struct {
 43	t *Transport
 44
 45	mu sync.Mutex // TODO: maybe switch to RWMutex
 46	// TODO: add support for sharing conns based on cert names
 47	// (e.g. share conn for googleapis.com and appspot.com)
 48	conns        map[string][]*ClientConn // key is host:port
 49	dialing      map[string]*dialCall     // currently in-flight dials
 50	keys         map[*ClientConn][]string
 51	addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
 52}
 53
 54func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
 55	return p.getClientConn(req, addr, dialOnMiss)
 56}
 57
 58const (
 59	dialOnMiss   = true
 60	noDialOnMiss = false
 61)
 62
 63func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
 64	// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
 65	if isConnectionCloseRequest(req) && dialOnMiss {
 66		// It gets its own connection.
 67		traceGetConn(req, addr)
 68		const singleUse = true
 69		cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
 70		if err != nil {
 71			return nil, err
 72		}
 73		return cc, nil
 74	}
 75	for {
 76		p.mu.Lock()
 77		for _, cc := range p.conns[addr] {
 78			if cc.ReserveNewRequest() {
 79				// When a connection is presented to us by the net/http package,
 80				// the GetConn hook has already been called.
 81				// Don't call it a second time here.
 82				if !cc.getConnCalled {
 83					traceGetConn(req, addr)
 84				}
 85				cc.getConnCalled = false
 86				p.mu.Unlock()
 87				return cc, nil
 88			}
 89		}
 90		if !dialOnMiss {
 91			p.mu.Unlock()
 92			return nil, ErrNoCachedConn
 93		}
 94		traceGetConn(req, addr)
 95		call := p.getStartDialLocked(req.Context(), addr)
 96		p.mu.Unlock()
 97		<-call.done
 98		if shouldRetryDial(call, req) {
 99			continue
100		}
101		cc, err := call.res, call.err
102		if err != nil {
103			return nil, err
104		}
105		if cc.ReserveNewRequest() {
106			return cc, nil
107		}
108	}
109}
110
111// dialCall is an in-flight Transport dial call to a host.
112type dialCall struct {
113	_ incomparable
114	p *clientConnPool
115	// the context associated with the request
116	// that created this dialCall
117	ctx  context.Context
118	done chan struct{} // closed when done
119	res  *ClientConn   // valid after done is closed
120	err  error         // valid after done is closed
121}
122
123// requires p.mu is held.
124func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
125	if call, ok := p.dialing[addr]; ok {
126		// A dial is already in-flight. Don't start another.
127		return call
128	}
129	call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
130	if p.dialing == nil {
131		p.dialing = make(map[string]*dialCall)
132	}
133	p.dialing[addr] = call
134	go call.dial(call.ctx, addr)
135	return call
136}
137
138// run in its own goroutine.
139func (c *dialCall) dial(ctx context.Context, addr string) {
140	const singleUse = false // shared conn
141	c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
142
143	c.p.mu.Lock()
144	delete(c.p.dialing, addr)
145	if c.err == nil {
146		c.p.addConnLocked(addr, c.res)
147	}
148	c.p.mu.Unlock()
149
150	close(c.done)
151}
152
153// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
154// already exist. It coalesces concurrent calls with the same key.
155// This is used by the http1 Transport code when it creates a new connection. Because
156// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
157// the protocol), it can get into a situation where it has multiple TLS connections.
158// This code decides which ones live or die.
159// The return value used is whether c was used.
160// c is never closed.
161func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c net.Conn) (used bool, err error) {
162	p.mu.Lock()
163	for _, cc := range p.conns[key] {
164		if cc.CanTakeNewRequest() {
165			p.mu.Unlock()
166			return false, nil
167		}
168	}
169	call, dup := p.addConnCalls[key]
170	if !dup {
171		if p.addConnCalls == nil {
172			p.addConnCalls = make(map[string]*addConnCall)
173		}
174		call = &addConnCall{
175			p:    p,
176			done: make(chan struct{}),
177		}
178		p.addConnCalls[key] = call
179		go call.run(t, key, c)
180	}
181	p.mu.Unlock()
182
183	<-call.done
184	if call.err != nil {
185		return false, call.err
186	}
187	return !dup, nil
188}
189
190type addConnCall struct {
191	_    incomparable
192	p    *clientConnPool
193	done chan struct{} // closed when done
194	err  error
195}
196
197func (c *addConnCall) run(t *Transport, key string, nc net.Conn) {
198	cc, err := t.NewClientConn(nc)
199
200	p := c.p
201	p.mu.Lock()
202	if err != nil {
203		c.err = err
204	} else {
205		cc.getConnCalled = true // already called by the net/http package
206		p.addConnLocked(key, cc)
207	}
208	delete(p.addConnCalls, key)
209	p.mu.Unlock()
210	close(c.done)
211}
212
213// p.mu must be held
214func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
215	for _, v := range p.conns[key] {
216		if v == cc {
217			return
218		}
219	}
220	if p.conns == nil {
221		p.conns = make(map[string][]*ClientConn)
222	}
223	if p.keys == nil {
224		p.keys = make(map[*ClientConn][]string)
225	}
226	p.conns[key] = append(p.conns[key], cc)
227	p.keys[cc] = append(p.keys[cc], key)
228}
229
230func (p *clientConnPool) MarkDead(cc *ClientConn) {
231	p.mu.Lock()
232	defer p.mu.Unlock()
233	for _, key := range p.keys[cc] {
234		vv, ok := p.conns[key]
235		if !ok {
236			continue
237		}
238		newList := filterOutClientConn(vv, cc)
239		if len(newList) > 0 {
240			p.conns[key] = newList
241		} else {
242			delete(p.conns, key)
243		}
244	}
245	delete(p.keys, cc)
246}
247
248func (p *clientConnPool) closeIdleConnections() {
249	p.mu.Lock()
250	defer p.mu.Unlock()
251	// TODO: don't close a cc if it was just added to the pool
252	// milliseconds ago and has never been used. There's currently
253	// a small race window with the HTTP/1 Transport's integration
254	// where it can add an idle conn just before using it, and
255	// somebody else can concurrently call CloseIdleConns and
256	// break some caller's RoundTrip.
257	for _, vv := range p.conns {
258		for _, cc := range vv {
259			cc.closeIfIdle()
260		}
261	}
262}
263
264func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
265	out := in[:0]
266	for _, v := range in {
267		if v != exclude {
268			out = append(out, v)
269		}
270	}
271	// If we filtered it out, zero out the last item to prevent
272	// the GC from seeing it.
273	if len(in) != len(out) {
274		in[len(in)-1] = nil
275	}
276	return out
277}
278
279// noDialClientConnPool is an implementation of http2.ClientConnPool
280// which never dials. We let the HTTP/1.1 client dial and use its TLS
281// connection instead.
282type noDialClientConnPool struct{ *clientConnPool }
283
284func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
285	return p.getClientConn(req, addr, noDialOnMiss)
286}
287
288// shouldRetryDial reports whether the current request should
289// retry dialing after the call finished unsuccessfully, for example
290// if the dial was canceled because of a context cancellation or
291// deadline expiry.
292func shouldRetryDial(call *dialCall, req *http.Request) bool {
293	if call.err == nil {
294		// No error, no need to retry
295		return false
296	}
297	if call.ctx == req.Context() {
298		// If the call has the same context as the request, the dial
299		// should not be retried, since any cancellation will have come
300		// from this request.
301		return false
302	}
303	if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
304		// If the call error is not because of a context cancellation or a deadline expiry,
305		// the dial should not be retried.
306		return false
307	}
308	// Only retry if the error is a context cancellation error or deadline expiry
309	// and the context associated with the call was canceled or expired.
310	return call.ctx.Err() != nil
311}