1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net/url"
27 "slices"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/balancer/pickfirst"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/connectivity"
38 "google.golang.org/grpc/internal"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpcsync"
41 "google.golang.org/grpc/internal/idle"
42 iresolver "google.golang.org/grpc/internal/resolver"
43 "google.golang.org/grpc/internal/stats"
44 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/resolver"
47 "google.golang.org/grpc/serviceconfig"
48 "google.golang.org/grpc/status"
49
50 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
51 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
52 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
53 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
54)
55
56const (
57 // minimum time to give a connection to complete
58 minConnectTimeout = 20 * time.Second
59)
60
61var (
62 // ErrClientConnClosing indicates that the operation is illegal because
63 // the ClientConn is closing.
64 //
65 // Deprecated: this error should not be relied upon by users; use the status
66 // code of Canceled instead.
67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69 errConnDrain = errors.New("grpc: the connection is drained")
70 // errConnClosing indicates that the connection is closing.
71 errConnClosing = errors.New("grpc: the connection is closing")
72 // errConnIdling indicates the connection is being closed as the channel
73 // is moving to an idle mode due to inactivity.
74 errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
75 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
76 // service config.
77 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
78 // PickFirstBalancerName is the name of the pick_first balancer.
79 PickFirstBalancerName = pickfirst.Name
80)
81
82// The following errors are returned from Dial and DialContext
83var (
84 // errNoTransportSecurity indicates that there is no transport security
85 // being set for ClientConn. Users should either set one or explicitly
86 // call WithInsecure DialOption to disable security.
87 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
88 // errTransportCredsAndBundle indicates that creds bundle is used together
89 // with other individual Transport Credentials.
90 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
91 // errNoTransportCredsInBundle indicated that the configured creds bundle
92 // returned a transport credentials which was nil.
93 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
94 // errTransportCredentialsMissing indicates that users want to transmit
95 // security information (e.g., OAuth2 token) which requires secure
96 // connection on an insecure connection.
97 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
98)
99
100const (
101 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
102 defaultClientMaxSendMessageSize = math.MaxInt32
103 // http2IOBufSize specifies the buffer size for sending frames.
104 defaultWriteBufSize = 32 * 1024
105 defaultReadBufSize = 32 * 1024
106)
107
108type defaultConfigSelector struct {
109 sc *ServiceConfig
110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113 return &iresolver.RPCConfig{
114 Context: rpcInfo.Context,
115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116 }, nil
117}
118
119// NewClient creates a new gRPC "channel" for the target URI provided. No I/O
120// is performed. Use of the ClientConn for RPCs will automatically cause it to
121// connect. The Connect method may be called to manually create a connection,
122// but for most users this should be unnecessary.
123//
124// The target name syntax is defined in
125// https://github.com/grpc/grpc/blob/master/doc/naming.md. E.g. to use the dns
126// name resolver, a "dns:///" prefix may be applied to the target. The default
127// name resolver will be used if no scheme is detected, or if the parsed scheme
128// is not a registered name resolver. The default resolver is "dns" but can be
129// overridden using the resolver package's SetDefaultScheme.
130//
131// Examples:
132//
133// - "foo.googleapis.com:8080"
134// - "dns:///foo.googleapis.com:8080"
135// - "dns:///foo.googleapis.com"
136// - "dns:///10.0.0.213:8080"
137// - "dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443"
138// - "dns://8.8.8.8/foo.googleapis.com:8080"
139// - "dns://8.8.8.8/foo.googleapis.com"
140// - "zookeeper://zk.example.com:9900/example_service"
141//
142// The DialOptions returned by WithBlock, WithTimeout,
143// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
144// function.
145func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
146 cc := &ClientConn{
147 target: target,
148 conns: make(map[*addrConn]struct{}),
149 dopts: defaultDialOptions(),
150 }
151
152 cc.retryThrottler.Store((*retryThrottler)(nil))
153 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
154 cc.ctx, cc.cancel = context.WithCancel(context.Background())
155
156 // Apply dial options.
157 disableGlobalOpts := false
158 for _, opt := range opts {
159 if _, ok := opt.(*disableGlobalDialOptions); ok {
160 disableGlobalOpts = true
161 break
162 }
163 }
164
165 if !disableGlobalOpts {
166 for _, opt := range globalDialOptions {
167 opt.apply(&cc.dopts)
168 }
169 }
170
171 for _, opt := range opts {
172 opt.apply(&cc.dopts)
173 }
174
175 // Determine the resolver to use.
176 if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
177 return nil, err
178 }
179
180 for _, opt := range globalPerTargetDialOptions {
181 opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
182 }
183
184 chainUnaryClientInterceptors(cc)
185 chainStreamClientInterceptors(cc)
186
187 if err := cc.validateTransportCredentials(); err != nil {
188 return nil, err
189 }
190
191 if cc.dopts.defaultServiceConfigRawJSON != nil {
192 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
193 if scpr.Err != nil {
194 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
195 }
196 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
197 }
198 cc.keepaliveParams = cc.dopts.copts.KeepaliveParams
199
200 if err = cc.initAuthority(); err != nil {
201 return nil, err
202 }
203
204 // Register ClientConn with channelz. Note that this is only done after
205 // channel creation cannot fail.
206 cc.channelzRegistration(target)
207 channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
208 channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
209
210 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
211 cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
212
213 cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
214
215 cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
216 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
217
218 return cc, nil
219}
220
221// Dial calls DialContext(context.Background(), target, opts...).
222//
223// Deprecated: use NewClient instead. Will be supported throughout 1.x.
224func Dial(target string, opts ...DialOption) (*ClientConn, error) {
225 return DialContext(context.Background(), target, opts...)
226}
227
228// DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
229// used, it calls Connect and WaitForStateChange until either the context
230// expires or the state of the ClientConn is Ready.
231//
232// One subtle difference between NewClient and Dial and DialContext is that the
233// former uses "dns" as the default name resolver, while the latter use
234// "passthrough" for backward compatibility. This distinction should not matter
235// to most users, but could matter to legacy users that specify a custom dialer
236// and expect it to receive the target string directly.
237//
238// Deprecated: use NewClient instead. Will be supported throughout 1.x.
239func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
240 // At the end of this method, we kick the channel out of idle, rather than
241 // waiting for the first rpc.
242 //
243 // WithLocalDNSResolution dial option in `grpc.Dial` ensures that it
244 // preserves behavior: when default scheme passthrough is used, skip
245 // hostname resolution, when "dns" is used for resolution, perform
246 // resolution on the client.
247 opts = append([]DialOption{withDefaultScheme("passthrough"), WithLocalDNSResolution()}, opts...)
248 cc, err := NewClient(target, opts...)
249 if err != nil {
250 return nil, err
251 }
252
253 // We start the channel off in idle mode, but kick it out of idle now,
254 // instead of waiting for the first RPC. This is the legacy behavior of
255 // Dial.
256 defer func() {
257 if err != nil {
258 cc.Close()
259 }
260 }()
261
262 // This creates the name resolver, load balancer, etc.
263 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
264 return nil, err
265 }
266
267 // Return now for non-blocking dials.
268 if !cc.dopts.block {
269 return cc, nil
270 }
271
272 if cc.dopts.timeout > 0 {
273 var cancel context.CancelFunc
274 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
275 defer cancel()
276 }
277 defer func() {
278 select {
279 case <-ctx.Done():
280 switch {
281 case ctx.Err() == err:
282 conn = nil
283 case err == nil || !cc.dopts.returnLastError:
284 conn, err = nil, ctx.Err()
285 default:
286 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
287 }
288 default:
289 }
290 }()
291
292 // A blocking dial blocks until the clientConn is ready.
293 for {
294 s := cc.GetState()
295 if s == connectivity.Idle {
296 cc.Connect()
297 }
298 if s == connectivity.Ready {
299 return cc, nil
300 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
301 if err = cc.connectionError(); err != nil {
302 terr, ok := err.(interface {
303 Temporary() bool
304 })
305 if ok && !terr.Temporary() {
306 return nil, err
307 }
308 }
309 }
310 if !cc.WaitForStateChange(ctx, s) {
311 // ctx got timeout or canceled.
312 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
313 return nil, err
314 }
315 return nil, ctx.Err()
316 }
317 }
318}
319
320// addTraceEvent is a helper method to add a trace event on the channel. If the
321// channel is a nested one, the same event is also added on the parent channel.
322func (cc *ClientConn) addTraceEvent(msg string) {
323 ted := &channelz.TraceEvent{
324 Desc: fmt.Sprintf("Channel %s", msg),
325 Severity: channelz.CtInfo,
326 }
327 if cc.dopts.channelzParent != nil {
328 ted.Parent = &channelz.TraceEvent{
329 Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
330 Severity: channelz.CtInfo,
331 }
332 }
333 channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
334}
335
336type idler ClientConn
337
338func (i *idler) EnterIdleMode() {
339 (*ClientConn)(i).enterIdleMode()
340}
341
342func (i *idler) ExitIdleMode() error {
343 return (*ClientConn)(i).exitIdleMode()
344}
345
346// exitIdleMode moves the channel out of idle mode by recreating the name
347// resolver and load balancer. This should never be called directly; use
348// cc.idlenessMgr.ExitIdleMode instead.
349func (cc *ClientConn) exitIdleMode() (err error) {
350 cc.mu.Lock()
351 if cc.conns == nil {
352 cc.mu.Unlock()
353 return errConnClosing
354 }
355 cc.mu.Unlock()
356
357 // This needs to be called without cc.mu because this builds a new resolver
358 // which might update state or report error inline, which would then need to
359 // acquire cc.mu.
360 if err := cc.resolverWrapper.start(); err != nil {
361 return err
362 }
363
364 cc.addTraceEvent("exiting idle mode")
365 return nil
366}
367
368// initIdleStateLocked initializes common state to how it should be while idle.
369func (cc *ClientConn) initIdleStateLocked() {
370 cc.resolverWrapper = newCCResolverWrapper(cc)
371 cc.balancerWrapper = newCCBalancerWrapper(cc)
372 cc.firstResolveEvent = grpcsync.NewEvent()
373 // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
374 // of setting it to nil here, we recreate the map. This also means that we
375 // don't have to do this when exiting idle mode.
376 cc.conns = make(map[*addrConn]struct{})
377}
378
379// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
380// name resolver, load balancer, and any subchannels. This should never be
381// called directly; use cc.idlenessMgr.EnterIdleMode instead.
382func (cc *ClientConn) enterIdleMode() {
383 cc.mu.Lock()
384
385 if cc.conns == nil {
386 cc.mu.Unlock()
387 return
388 }
389
390 conns := cc.conns
391
392 rWrapper := cc.resolverWrapper
393 rWrapper.close()
394 cc.pickerWrapper.reset()
395 bWrapper := cc.balancerWrapper
396 bWrapper.close()
397 cc.csMgr.updateState(connectivity.Idle)
398 cc.addTraceEvent("entering idle mode")
399
400 cc.initIdleStateLocked()
401
402 cc.mu.Unlock()
403
404 // Block until the name resolver and LB policy are closed.
405 <-rWrapper.serializer.Done()
406 <-bWrapper.serializer.Done()
407
408 // Close all subchannels after the LB policy is closed.
409 for ac := range conns {
410 ac.tearDown(errConnIdling)
411 }
412}
413
414// validateTransportCredentials performs a series of checks on the configured
415// transport credentials. It returns a non-nil error if any of these conditions
416// are met:
417// - no transport creds and no creds bundle is configured
418// - both transport creds and creds bundle are configured
419// - creds bundle is configured, but it lacks a transport credentials
420// - insecure transport creds configured alongside call creds that require
421// transport level security
422//
423// If none of the above conditions are met, the configured credentials are
424// deemed valid and a nil error is returned.
425func (cc *ClientConn) validateTransportCredentials() error {
426 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
427 return errNoTransportSecurity
428 }
429 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
430 return errTransportCredsAndBundle
431 }
432 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
433 return errNoTransportCredsInBundle
434 }
435 transportCreds := cc.dopts.copts.TransportCredentials
436 if transportCreds == nil {
437 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
438 }
439 if transportCreds.Info().SecurityProtocol == "insecure" {
440 for _, cd := range cc.dopts.copts.PerRPCCredentials {
441 if cd.RequireTransportSecurity() {
442 return errTransportCredentialsMissing
443 }
444 }
445 }
446 return nil
447}
448
449// channelzRegistration registers the newly created ClientConn with channelz and
450// stores the returned identifier in `cc.channelz`. A channelz trace event is
451// emitted for ClientConn creation. If the newly created ClientConn is a nested
452// one, i.e a valid parent ClientConn ID is specified via a dial option, the
453// trace event is also added to the parent.
454//
455// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
456func (cc *ClientConn) channelzRegistration(target string) {
457 parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
458 cc.channelz = channelz.RegisterChannel(parentChannel, target)
459 cc.addTraceEvent("created")
460}
461
462// chainUnaryClientInterceptors chains all unary client interceptors into one.
463func chainUnaryClientInterceptors(cc *ClientConn) {
464 interceptors := cc.dopts.chainUnaryInts
465 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
466 // be executed before any other chained interceptors.
467 if cc.dopts.unaryInt != nil {
468 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
469 }
470 var chainedInt UnaryClientInterceptor
471 if len(interceptors) == 0 {
472 chainedInt = nil
473 } else if len(interceptors) == 1 {
474 chainedInt = interceptors[0]
475 } else {
476 chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
477 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
478 }
479 }
480 cc.dopts.unaryInt = chainedInt
481}
482
483// getChainUnaryInvoker recursively generate the chained unary invoker.
484func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
485 if curr == len(interceptors)-1 {
486 return finalInvoker
487 }
488 return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
489 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
490 }
491}
492
493// chainStreamClientInterceptors chains all stream client interceptors into one.
494func chainStreamClientInterceptors(cc *ClientConn) {
495 interceptors := cc.dopts.chainStreamInts
496 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
497 // be executed before any other chained interceptors.
498 if cc.dopts.streamInt != nil {
499 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
500 }
501 var chainedInt StreamClientInterceptor
502 if len(interceptors) == 0 {
503 chainedInt = nil
504 } else if len(interceptors) == 1 {
505 chainedInt = interceptors[0]
506 } else {
507 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
508 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
509 }
510 }
511 cc.dopts.streamInt = chainedInt
512}
513
514// getChainStreamer recursively generate the chained client stream constructor.
515func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
516 if curr == len(interceptors)-1 {
517 return finalStreamer
518 }
519 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
520 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
521 }
522}
523
524// newConnectivityStateManager creates an connectivityStateManager with
525// the specified channel.
526func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
527 return &connectivityStateManager{
528 channelz: channel,
529 pubSub: grpcsync.NewPubSub(ctx),
530 }
531}
532
533// connectivityStateManager keeps the connectivity.State of ClientConn.
534// This struct will eventually be exported so the balancers can access it.
535//
536// TODO: If possible, get rid of the `connectivityStateManager` type, and
537// provide this functionality using the `PubSub`, to avoid keeping track of
538// the connectivity state at two places.
539type connectivityStateManager struct {
540 mu sync.Mutex
541 state connectivity.State
542 notifyChan chan struct{}
543 channelz *channelz.Channel
544 pubSub *grpcsync.PubSub
545}
546
547// updateState updates the connectivity.State of ClientConn.
548// If there's a change it notifies goroutines waiting on state change to
549// happen.
550func (csm *connectivityStateManager) updateState(state connectivity.State) {
551 csm.mu.Lock()
552 defer csm.mu.Unlock()
553 if csm.state == connectivity.Shutdown {
554 return
555 }
556 if csm.state == state {
557 return
558 }
559 csm.state = state
560 csm.channelz.ChannelMetrics.State.Store(&state)
561 csm.pubSub.Publish(state)
562
563 channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
564 if csm.notifyChan != nil {
565 // There are other goroutines waiting on this channel.
566 close(csm.notifyChan)
567 csm.notifyChan = nil
568 }
569}
570
571func (csm *connectivityStateManager) getState() connectivity.State {
572 csm.mu.Lock()
573 defer csm.mu.Unlock()
574 return csm.state
575}
576
577func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
578 csm.mu.Lock()
579 defer csm.mu.Unlock()
580 if csm.notifyChan == nil {
581 csm.notifyChan = make(chan struct{})
582 }
583 return csm.notifyChan
584}
585
586// ClientConnInterface defines the functions clients need to perform unary and
587// streaming RPCs. It is implemented by *ClientConn, and is only intended to
588// be referenced by generated code.
589type ClientConnInterface interface {
590 // Invoke performs a unary RPC and returns after the response is received
591 // into reply.
592 Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
593 // NewStream begins a streaming RPC.
594 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
595}
596
597// Assert *ClientConn implements ClientConnInterface.
598var _ ClientConnInterface = (*ClientConn)(nil)
599
600// ClientConn represents a virtual connection to a conceptual endpoint, to
601// perform RPCs.
602//
603// A ClientConn is free to have zero or more actual connections to the endpoint
604// based on configuration, load, etc. It is also free to determine which actual
605// endpoints to use and may change it every RPC, permitting client-side load
606// balancing.
607//
608// A ClientConn encapsulates a range of functionality including name
609// resolution, TCP connection establishment (with retries and backoff) and TLS
610// handshakes. It also handles errors on established connections by
611// re-resolving the name and reconnecting.
612type ClientConn struct {
613 ctx context.Context // Initialized using the background context at dial time.
614 cancel context.CancelFunc // Cancelled on close.
615
616 // The following are initialized at dial time, and are read-only after that.
617 target string // User's dial target.
618 parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
619 authority string // See initAuthority().
620 dopts dialOptions // Default and user specified dial options.
621 channelz *channelz.Channel // Channelz object.
622 resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
623 idlenessMgr *idle.Manager
624 metricsRecorderList *stats.MetricsRecorderList
625
626 // The following provide their own synchronization, and therefore don't
627 // require cc.mu to be held to access them.
628 csMgr *connectivityStateManager
629 pickerWrapper *pickerWrapper
630 safeConfigSelector iresolver.SafeConfigSelector
631 retryThrottler atomic.Value // Updated from service config.
632
633 // mu protects the following fields.
634 // TODO: split mu so the same mutex isn't used for everything.
635 mu sync.RWMutex
636 resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
637 balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
638 sc *ServiceConfig // Latest service config received from the resolver.
639 conns map[*addrConn]struct{} // Set to nil on close.
640 keepaliveParams keepalive.ClientParameters // May be updated upon receipt of a GoAway.
641 // firstResolveEvent is used to track whether the name resolver sent us at
642 // least one update. RPCs block on this event. May be accessed without mu
643 // if we know we cannot be asked to enter idle mode while accessing it (e.g.
644 // when the idle manager has already been closed, or if we are already
645 // entering idle mode).
646 firstResolveEvent *grpcsync.Event
647
648 lceMu sync.Mutex // protects lastConnectionError
649 lastConnectionError error
650}
651
652// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
653// ctx expires. A true value is returned in former case and false in latter.
654func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
655 ch := cc.csMgr.getNotifyChan()
656 if cc.csMgr.getState() != sourceState {
657 return true
658 }
659 select {
660 case <-ctx.Done():
661 return false
662 case <-ch:
663 return true
664 }
665}
666
667// GetState returns the connectivity.State of ClientConn.
668func (cc *ClientConn) GetState() connectivity.State {
669 return cc.csMgr.getState()
670}
671
672// Connect causes all subchannels in the ClientConn to attempt to connect if
673// the channel is idle. Does not wait for the connection attempts to begin
674// before returning.
675//
676// # Experimental
677//
678// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
679// release.
680func (cc *ClientConn) Connect() {
681 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
682 cc.addTraceEvent(err.Error())
683 return
684 }
685 // If the ClientConn was not in idle mode, we need to call ExitIdle on the
686 // LB policy so that connections can be created.
687 cc.mu.Lock()
688 cc.balancerWrapper.exitIdle()
689 cc.mu.Unlock()
690}
691
692// waitForResolvedAddrs blocks until the resolver has provided addresses or the
693// context expires. Returns nil unless the context expires first; otherwise
694// returns a status error based on the context.
695func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
696 // This is on the RPC path, so we use a fast path to avoid the
697 // more-expensive "select" below after the resolver has returned once.
698 if cc.firstResolveEvent.HasFired() {
699 return nil
700 }
701 select {
702 case <-cc.firstResolveEvent.Done():
703 return nil
704 case <-ctx.Done():
705 return status.FromContextError(ctx.Err()).Err()
706 case <-cc.ctx.Done():
707 return ErrClientConnClosing
708 }
709}
710
711var emptyServiceConfig *ServiceConfig
712
713func init() {
714 cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
715 if cfg.Err != nil {
716 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
717 }
718 emptyServiceConfig = cfg.Config.(*ServiceConfig)
719
720 internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
721 return cc.csMgr.pubSub.Subscribe(s)
722 }
723 internal.EnterIdleModeForTesting = func(cc *ClientConn) {
724 cc.idlenessMgr.EnterIdleModeForTesting()
725 }
726 internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
727 return cc.idlenessMgr.ExitIdleMode()
728 }
729}
730
731func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
732 if cc.sc != nil {
733 cc.applyServiceConfigAndBalancer(cc.sc, nil)
734 return
735 }
736 if cc.dopts.defaultServiceConfig != nil {
737 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
738 } else {
739 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
740 }
741}
742
743func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
744 defer cc.firstResolveEvent.Fire()
745 // Check if the ClientConn is already closed. Some fields (e.g.
746 // balancerWrapper) are set to nil when closing the ClientConn, and could
747 // cause nil pointer panic if we don't have this check.
748 if cc.conns == nil {
749 cc.mu.Unlock()
750 return nil
751 }
752
753 if err != nil {
754 // May need to apply the initial service config in case the resolver
755 // doesn't support service configs, or doesn't provide a service config
756 // with the new addresses.
757 cc.maybeApplyDefaultServiceConfig()
758
759 cc.balancerWrapper.resolverError(err)
760
761 // No addresses are valid with err set; return early.
762 cc.mu.Unlock()
763 return balancer.ErrBadResolverState
764 }
765
766 var ret error
767 if cc.dopts.disableServiceConfig {
768 channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
769 cc.maybeApplyDefaultServiceConfig()
770 } else if s.ServiceConfig == nil {
771 cc.maybeApplyDefaultServiceConfig()
772 // TODO: do we need to apply a failing LB policy if there is no
773 // default, per the error handling design?
774 } else {
775 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
776 configSelector := iresolver.GetConfigSelector(s)
777 if configSelector != nil {
778 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
779 channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
780 }
781 } else {
782 configSelector = &defaultConfigSelector{sc}
783 }
784 cc.applyServiceConfigAndBalancer(sc, configSelector)
785 } else {
786 ret = balancer.ErrBadResolverState
787 if cc.sc == nil {
788 // Apply the failing LB only if we haven't received valid service config
789 // from the name resolver in the past.
790 cc.applyFailingLBLocked(s.ServiceConfig)
791 cc.mu.Unlock()
792 return ret
793 }
794 }
795 }
796
797 balCfg := cc.sc.lbConfig
798 bw := cc.balancerWrapper
799 cc.mu.Unlock()
800
801 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
802 if ret == nil {
803 ret = uccsErr // prefer ErrBadResolver state since any other error is
804 // currently meaningless to the caller.
805 }
806 return ret
807}
808
809// applyFailingLBLocked is akin to configuring an LB policy on the channel which
810// always fails RPCs. Here, an actual LB policy is not configured, but an always
811// erroring picker is configured, which returns errors with information about
812// what was invalid in the received service config. A config selector with no
813// service config is configured, and the connectivity state of the channel is
814// set to TransientFailure.
815func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
816 var err error
817 if sc.Err != nil {
818 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
819 } else {
820 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
821 }
822 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
823 cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
824 cc.csMgr.updateState(connectivity.TransientFailure)
825}
826
827// Makes a copy of the input addresses slice. Addresses are passed during
828// subconn creation and address update operations.
829func copyAddresses(in []resolver.Address) []resolver.Address {
830 out := make([]resolver.Address, len(in))
831 copy(out, in)
832 return out
833}
834
835// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
836//
837// Caller needs to make sure len(addrs) > 0.
838func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
839 if cc.conns == nil {
840 return nil, ErrClientConnClosing
841 }
842
843 ac := &addrConn{
844 state: connectivity.Idle,
845 cc: cc,
846 addrs: copyAddresses(addrs),
847 scopts: opts,
848 dopts: cc.dopts,
849 channelz: channelz.RegisterSubChannel(cc.channelz, ""),
850 resetBackoff: make(chan struct{}),
851 }
852 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
853 // Start with our address set to the first address; this may be updated if
854 // we connect to different addresses.
855 ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
856
857 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
858 Desc: "Subchannel created",
859 Severity: channelz.CtInfo,
860 Parent: &channelz.TraceEvent{
861 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
862 Severity: channelz.CtInfo,
863 },
864 })
865
866 // Track ac in cc. This needs to be done before any getTransport(...) is called.
867 cc.conns[ac] = struct{}{}
868 return ac, nil
869}
870
871// removeAddrConn removes the addrConn in the subConn from clientConn.
872// It also tears down the ac with the given error.
873func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
874 cc.mu.Lock()
875 if cc.conns == nil {
876 cc.mu.Unlock()
877 return
878 }
879 delete(cc.conns, ac)
880 cc.mu.Unlock()
881 ac.tearDown(err)
882}
883
884// Target returns the target string of the ClientConn.
885func (cc *ClientConn) Target() string {
886 return cc.target
887}
888
889// CanonicalTarget returns the canonical target string used when creating cc.
890//
891// This always has the form "<scheme>://[authority]/<endpoint>". For example:
892//
893// - "dns:///example.com:42"
894// - "dns://8.8.8.8/example.com:42"
895// - "unix:///path/to/socket"
896func (cc *ClientConn) CanonicalTarget() string {
897 return cc.parsedTarget.String()
898}
899
900func (cc *ClientConn) incrCallsStarted() {
901 cc.channelz.ChannelMetrics.CallsStarted.Add(1)
902 cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
903}
904
905func (cc *ClientConn) incrCallsSucceeded() {
906 cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
907}
908
909func (cc *ClientConn) incrCallsFailed() {
910 cc.channelz.ChannelMetrics.CallsFailed.Add(1)
911}
912
913// connect starts creating a transport.
914// It does nothing if the ac is not IDLE.
915// TODO(bar) Move this to the addrConn section.
916func (ac *addrConn) connect() error {
917 ac.mu.Lock()
918 if ac.state == connectivity.Shutdown {
919 if logger.V(2) {
920 logger.Infof("connect called on shutdown addrConn; ignoring.")
921 }
922 ac.mu.Unlock()
923 return errConnClosing
924 }
925 if ac.state != connectivity.Idle {
926 if logger.V(2) {
927 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
928 }
929 ac.mu.Unlock()
930 return nil
931 }
932
933 ac.resetTransportAndUnlock()
934 return nil
935}
936
937// equalAddressIgnoringBalAttributes returns true is a and b are considered equal.
938// This is different from the Equal method on the resolver.Address type which
939// considers all fields to determine equality. Here, we only consider fields
940// that are meaningful to the subConn.
941func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
942 return a.Addr == b.Addr && a.ServerName == b.ServerName &&
943 a.Attributes.Equal(b.Attributes) &&
944 a.Metadata == b.Metadata
945}
946
947func equalAddressesIgnoringBalAttributes(a, b []resolver.Address) bool {
948 return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddressIgnoringBalAttributes(&a, &b) })
949}
950
951// updateAddrs updates ac.addrs with the new addresses list and handles active
952// connections or connection attempts.
953func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
954 addrs = copyAddresses(addrs)
955 limit := len(addrs)
956 if limit > 5 {
957 limit = 5
958 }
959 channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
960
961 ac.mu.Lock()
962 if equalAddressesIgnoringBalAttributes(ac.addrs, addrs) {
963 ac.mu.Unlock()
964 return
965 }
966
967 ac.addrs = addrs
968
969 if ac.state == connectivity.Shutdown ||
970 ac.state == connectivity.TransientFailure ||
971 ac.state == connectivity.Idle {
972 // We were not connecting, so do nothing but update the addresses.
973 ac.mu.Unlock()
974 return
975 }
976
977 if ac.state == connectivity.Ready {
978 // Try to find the connected address.
979 for _, a := range addrs {
980 a.ServerName = ac.cc.getServerName(a)
981 if equalAddressIgnoringBalAttributes(&a, &ac.curAddr) {
982 // We are connected to a valid address, so do nothing but
983 // update the addresses.
984 ac.mu.Unlock()
985 return
986 }
987 }
988 }
989
990 // We are either connected to the wrong address or currently connecting.
991 // Stop the current iteration and restart.
992
993 ac.cancel()
994 ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
995
996 // We have to defer here because GracefulClose => onClose, which requires
997 // locking ac.mu.
998 if ac.transport != nil {
999 defer ac.transport.GracefulClose()
1000 ac.transport = nil
1001 }
1002
1003 if len(addrs) == 0 {
1004 ac.updateConnectivityState(connectivity.Idle, nil)
1005 }
1006
1007 // Since we were connecting/connected, we should start a new connection
1008 // attempt.
1009 go ac.resetTransportAndUnlock()
1010}
1011
1012// getServerName determines the serverName to be used in the connection
1013// handshake. The default value for the serverName is the authority on the
1014// ClientConn, which either comes from the user's dial target or through an
1015// authority override specified using the WithAuthority dial option. Name
1016// resolvers can specify a per-address override for the serverName through the
1017// resolver.Address.ServerName field which is used only if the WithAuthority
1018// dial option was not used. The rationale is that per-address authority
1019// overrides specified by the name resolver can represent a security risk, while
1020// an override specified by the user is more dependable since they probably know
1021// what they are doing.
1022func (cc *ClientConn) getServerName(addr resolver.Address) string {
1023 if cc.dopts.authority != "" {
1024 return cc.dopts.authority
1025 }
1026 if addr.ServerName != "" {
1027 return addr.ServerName
1028 }
1029 return cc.authority
1030}
1031
1032func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
1033 if sc == nil {
1034 return MethodConfig{}
1035 }
1036 if m, ok := sc.Methods[method]; ok {
1037 return m
1038 }
1039 i := strings.LastIndex(method, "/")
1040 if m, ok := sc.Methods[method[:i+1]]; ok {
1041 return m
1042 }
1043 return sc.Methods[""]
1044}
1045
1046// GetMethodConfig gets the method config of the input method.
1047// If there's an exact match for input method (i.e. /service/method), we return
1048// the corresponding MethodConfig.
1049// If there isn't an exact match for the input method, we look for the service's default
1050// config under the service (i.e /service/) and then for the default for all services (empty string).
1051//
1052// If there is a default MethodConfig for the service, we return it.
1053// Otherwise, we return an empty MethodConfig.
1054func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
1055 // TODO: Avoid the locking here.
1056 cc.mu.RLock()
1057 defer cc.mu.RUnlock()
1058 return getMethodConfig(cc.sc, method)
1059}
1060
1061func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
1062 cc.mu.RLock()
1063 defer cc.mu.RUnlock()
1064 if cc.sc == nil {
1065 return nil
1066 }
1067 return cc.sc.healthCheckConfig
1068}
1069
1070func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
1071 return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
1072 Ctx: ctx,
1073 FullMethodName: method,
1074 })
1075}
1076
1077func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
1078 if sc == nil {
1079 // should never reach here.
1080 return
1081 }
1082 cc.sc = sc
1083 if configSelector != nil {
1084 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
1085 }
1086
1087 if cc.sc.retryThrottling != nil {
1088 newThrottler := &retryThrottler{
1089 tokens: cc.sc.retryThrottling.MaxTokens,
1090 max: cc.sc.retryThrottling.MaxTokens,
1091 thresh: cc.sc.retryThrottling.MaxTokens / 2,
1092 ratio: cc.sc.retryThrottling.TokenRatio,
1093 }
1094 cc.retryThrottler.Store(newThrottler)
1095 } else {
1096 cc.retryThrottler.Store((*retryThrottler)(nil))
1097 }
1098}
1099
1100func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1101 cc.mu.RLock()
1102 cc.resolverWrapper.resolveNow(o)
1103 cc.mu.RUnlock()
1104}
1105
1106func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
1107 cc.resolverWrapper.resolveNow(o)
1108}
1109
1110// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1111// them to attempt another connection immediately. It also resets the backoff
1112// times used for subsequent attempts regardless of the current state.
1113//
1114// In general, this function should not be used. Typical service or network
1115// outages result in a reasonable client reconnection strategy by default.
1116// However, if a previously unavailable network becomes available, this may be
1117// used to trigger an immediate reconnect.
1118//
1119// # Experimental
1120//
1121// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1122// later release.
1123func (cc *ClientConn) ResetConnectBackoff() {
1124 cc.mu.Lock()
1125 conns := cc.conns
1126 cc.mu.Unlock()
1127 for ac := range conns {
1128 ac.resetConnectBackoff()
1129 }
1130}
1131
1132// Close tears down the ClientConn and all underlying connections.
1133func (cc *ClientConn) Close() error {
1134 defer func() {
1135 cc.cancel()
1136 <-cc.csMgr.pubSub.Done()
1137 }()
1138
1139 // Prevent calls to enter/exit idle immediately, and ensure we are not
1140 // currently entering/exiting idle mode.
1141 cc.idlenessMgr.Close()
1142
1143 cc.mu.Lock()
1144 if cc.conns == nil {
1145 cc.mu.Unlock()
1146 return ErrClientConnClosing
1147 }
1148
1149 conns := cc.conns
1150 cc.conns = nil
1151 cc.csMgr.updateState(connectivity.Shutdown)
1152
1153 // We can safely unlock and continue to access all fields now as
1154 // cc.conns==nil, preventing any further operations on cc.
1155 cc.mu.Unlock()
1156
1157 cc.resolverWrapper.close()
1158 // The order of closing matters here since the balancer wrapper assumes the
1159 // picker is closed before it is closed.
1160 cc.pickerWrapper.close()
1161 cc.balancerWrapper.close()
1162
1163 <-cc.resolverWrapper.serializer.Done()
1164 <-cc.balancerWrapper.serializer.Done()
1165 var wg sync.WaitGroup
1166 for ac := range conns {
1167 wg.Add(1)
1168 go func(ac *addrConn) {
1169 defer wg.Done()
1170 ac.tearDown(ErrClientConnClosing)
1171 }(ac)
1172 }
1173 wg.Wait()
1174 cc.addTraceEvent("deleted")
1175 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1176 // trace reference to the entity being deleted, and thus prevent it from being
1177 // deleted right away.
1178 channelz.RemoveEntry(cc.channelz.ID)
1179
1180 return nil
1181}
1182
1183// addrConn is a network connection to a given address.
1184type addrConn struct {
1185 ctx context.Context
1186 cancel context.CancelFunc
1187
1188 cc *ClientConn
1189 dopts dialOptions
1190 acbw *acBalancerWrapper
1191 scopts balancer.NewSubConnOptions
1192
1193 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1194 // health checking may require server to report healthy to set ac to READY), and is reset
1195 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1196 // is received, transport is closed, ac has been torn down).
1197 transport transport.ClientTransport // The current transport.
1198
1199 // This mutex is used on the RPC path, so its usage should be minimized as
1200 // much as possible.
1201 // TODO: Find a lock-free way to retrieve the transport and state from the
1202 // addrConn.
1203 mu sync.Mutex
1204 curAddr resolver.Address // The current address.
1205 addrs []resolver.Address // All addresses that the resolver resolved to.
1206
1207 // Use updateConnectivityState for updating addrConn's connectivity state.
1208 state connectivity.State
1209
1210 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1211 resetBackoff chan struct{}
1212
1213 channelz *channelz.SubChannel
1214}
1215
1216// Note: this requires a lock on ac.mu.
1217func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1218 if ac.state == s {
1219 return
1220 }
1221 ac.state = s
1222 ac.channelz.ChannelMetrics.State.Store(&s)
1223 if lastErr == nil {
1224 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
1225 } else {
1226 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
1227 }
1228 ac.acbw.updateState(s, ac.curAddr, lastErr)
1229}
1230
1231// adjustParams updates parameters used to create transports upon
1232// receiving a GoAway.
1233func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1234 switch r {
1235 case transport.GoAwayTooManyPings:
1236 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1237 ac.cc.mu.Lock()
1238 if v > ac.cc.keepaliveParams.Time {
1239 ac.cc.keepaliveParams.Time = v
1240 }
1241 ac.cc.mu.Unlock()
1242 }
1243}
1244
1245// resetTransportAndUnlock unconditionally connects the addrConn.
1246//
1247// ac.mu must be held by the caller, and this function will guarantee it is released.
1248func (ac *addrConn) resetTransportAndUnlock() {
1249 acCtx := ac.ctx
1250 if acCtx.Err() != nil {
1251 ac.mu.Unlock()
1252 return
1253 }
1254
1255 addrs := ac.addrs
1256 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1257 // This will be the duration that dial gets to finish.
1258 dialDuration := minConnectTimeout
1259 if ac.dopts.minConnectTimeout != nil {
1260 dialDuration = ac.dopts.minConnectTimeout()
1261 }
1262
1263 if dialDuration < backoffFor {
1264 // Give dial more time as we keep failing to connect.
1265 dialDuration = backoffFor
1266 }
1267 // We can potentially spend all the time trying the first address, and
1268 // if the server accepts the connection and then hangs, the following
1269 // addresses will never be tried.
1270 //
1271 // The spec doesn't mention what should be done for multiple addresses.
1272 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1273 connectDeadline := time.Now().Add(dialDuration)
1274
1275 ac.updateConnectivityState(connectivity.Connecting, nil)
1276 ac.mu.Unlock()
1277
1278 if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1279 // TODO: #7534 - Move re-resolution requests into the pick_first LB policy
1280 // to ensure one resolution request per pass instead of per subconn failure.
1281 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1282 ac.mu.Lock()
1283 if acCtx.Err() != nil {
1284 // addrConn was torn down.
1285 ac.mu.Unlock()
1286 return
1287 }
1288 // After exhausting all addresses, the addrConn enters
1289 // TRANSIENT_FAILURE.
1290 ac.updateConnectivityState(connectivity.TransientFailure, err)
1291
1292 // Backoff.
1293 b := ac.resetBackoff
1294 ac.mu.Unlock()
1295
1296 timer := time.NewTimer(backoffFor)
1297 select {
1298 case <-timer.C:
1299 ac.mu.Lock()
1300 ac.backoffIdx++
1301 ac.mu.Unlock()
1302 case <-b:
1303 timer.Stop()
1304 case <-acCtx.Done():
1305 timer.Stop()
1306 return
1307 }
1308
1309 ac.mu.Lock()
1310 if acCtx.Err() == nil {
1311 ac.updateConnectivityState(connectivity.Idle, err)
1312 }
1313 ac.mu.Unlock()
1314 return
1315 }
1316 // Success; reset backoff.
1317 ac.mu.Lock()
1318 ac.backoffIdx = 0
1319 ac.mu.Unlock()
1320}
1321
1322// tryAllAddrs tries to create a connection to the addresses, and stop when at
1323// the first successful one. It returns an error if no address was successfully
1324// connected, or updates ac appropriately with the new transport.
1325func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
1326 var firstConnErr error
1327 for _, addr := range addrs {
1328 ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
1329 if ctx.Err() != nil {
1330 return errConnClosing
1331 }
1332 ac.mu.Lock()
1333
1334 ac.cc.mu.RLock()
1335 ac.dopts.copts.KeepaliveParams = ac.cc.keepaliveParams
1336 ac.cc.mu.RUnlock()
1337
1338 copts := ac.dopts.copts
1339 if ac.scopts.CredsBundle != nil {
1340 copts.CredsBundle = ac.scopts.CredsBundle
1341 }
1342 ac.mu.Unlock()
1343
1344 channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
1345
1346 err := ac.createTransport(ctx, addr, copts, connectDeadline)
1347 if err == nil {
1348 return nil
1349 }
1350 if firstConnErr == nil {
1351 firstConnErr = err
1352 }
1353 ac.cc.updateConnectionError(err)
1354 }
1355
1356 // Couldn't connect to any address.
1357 return firstConnErr
1358}
1359
1360// createTransport creates a connection to addr. It returns an error if the
1361// address was not successfully connected, or updates ac appropriately with the
1362// new transport.
1363func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1364 addr.ServerName = ac.cc.getServerName(addr)
1365 hctx, hcancel := context.WithCancel(ctx)
1366
1367 onClose := func(r transport.GoAwayReason) {
1368 ac.mu.Lock()
1369 defer ac.mu.Unlock()
1370 // adjust params based on GoAwayReason
1371 ac.adjustParams(r)
1372 if ctx.Err() != nil {
1373 // Already shut down or connection attempt canceled. tearDown() or
1374 // updateAddrs() already cleared the transport and canceled hctx
1375 // via ac.ctx, and we expected this connection to be closed, so do
1376 // nothing here.
1377 return
1378 }
1379 hcancel()
1380 if ac.transport == nil {
1381 // We're still connecting to this address, which could error. Do
1382 // not update the connectivity state or resolve; these will happen
1383 // at the end of the tryAllAddrs connection loop in the event of an
1384 // error.
1385 return
1386 }
1387 ac.transport = nil
1388 // Refresh the name resolver on any connection loss.
1389 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1390 // Always go idle and wait for the LB policy to initiate a new
1391 // connection attempt.
1392 ac.updateConnectivityState(connectivity.Idle, nil)
1393 }
1394
1395 connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
1396 defer cancel()
1397 copts.ChannelzParent = ac.channelz
1398
1399 newTr, err := transport.NewHTTP2Client(connectCtx, ac.cc.ctx, addr, copts, onClose)
1400 if err != nil {
1401 if logger.V(2) {
1402 logger.Infof("Creating new client transport to %q: %v", addr, err)
1403 }
1404 // newTr is either nil, or closed.
1405 hcancel()
1406 channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
1407 return err
1408 }
1409
1410 ac.mu.Lock()
1411 defer ac.mu.Unlock()
1412 if ctx.Err() != nil {
1413 // This can happen if the subConn was removed while in `Connecting`
1414 // state. tearDown() would have set the state to `Shutdown`, but
1415 // would not have closed the transport since ac.transport would not
1416 // have been set at that point.
1417 //
1418 // We run this in a goroutine because newTr.Close() calls onClose()
1419 // inline, which requires locking ac.mu.
1420 //
1421 // The error we pass to Close() is immaterial since there are no open
1422 // streams at this point, so no trailers with error details will be sent
1423 // out. We just need to pass a non-nil error.
1424 //
1425 // This can also happen when updateAddrs is called during a connection
1426 // attempt.
1427 go newTr.Close(transport.ErrConnClosing)
1428 return nil
1429 }
1430 if hctx.Err() != nil {
1431 // onClose was already called for this connection, but the connection
1432 // was successfully established first. Consider it a success and set
1433 // the new state to Idle.
1434 ac.updateConnectivityState(connectivity.Idle, nil)
1435 return nil
1436 }
1437 ac.curAddr = addr
1438 ac.transport = newTr
1439 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1440 return nil
1441}
1442
1443// startHealthCheck starts the health checking stream (RPC) to watch the health
1444// stats of this connection if health checking is requested and configured.
1445//
1446// LB channel health checking is enabled when all requirements below are met:
1447// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1448// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1449// 3. a service config with non-empty healthCheckConfig field is provided
1450// 4. the load balancer requests it
1451//
1452// It sets addrConn to READY if the health checking stream is not started.
1453//
1454// Caller must hold ac.mu.
1455func (ac *addrConn) startHealthCheck(ctx context.Context) {
1456 var healthcheckManagingState bool
1457 defer func() {
1458 if !healthcheckManagingState {
1459 ac.updateConnectivityState(connectivity.Ready, nil)
1460 }
1461 }()
1462
1463 if ac.cc.dopts.disableHealthCheck {
1464 return
1465 }
1466 healthCheckConfig := ac.cc.healthCheckConfig()
1467 if healthCheckConfig == nil {
1468 return
1469 }
1470 if !ac.scopts.HealthCheckEnabled {
1471 return
1472 }
1473 healthCheckFunc := internal.HealthCheckFunc
1474 if healthCheckFunc == nil {
1475 // The health package is not imported to set health check function.
1476 //
1477 // TODO: add a link to the health check doc in the error message.
1478 channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
1479 return
1480 }
1481
1482 healthcheckManagingState = true
1483
1484 // Set up the health check helper functions.
1485 currentTr := ac.transport
1486 newStream := func(method string) (any, error) {
1487 ac.mu.Lock()
1488 if ac.transport != currentTr {
1489 ac.mu.Unlock()
1490 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1491 }
1492 ac.mu.Unlock()
1493 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1494 }
1495 setConnectivityState := func(s connectivity.State, lastErr error) {
1496 ac.mu.Lock()
1497 defer ac.mu.Unlock()
1498 if ac.transport != currentTr {
1499 return
1500 }
1501 ac.updateConnectivityState(s, lastErr)
1502 }
1503 // Start the health checking stream.
1504 go func() {
1505 err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1506 if err != nil {
1507 if status.Code(err) == codes.Unimplemented {
1508 channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1509 } else {
1510 channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
1511 }
1512 }
1513 }()
1514}
1515
1516func (ac *addrConn) resetConnectBackoff() {
1517 ac.mu.Lock()
1518 close(ac.resetBackoff)
1519 ac.backoffIdx = 0
1520 ac.resetBackoff = make(chan struct{})
1521 ac.mu.Unlock()
1522}
1523
1524// getReadyTransport returns the transport if ac's state is READY or nil if not.
1525func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1526 ac.mu.Lock()
1527 defer ac.mu.Unlock()
1528 if ac.state == connectivity.Ready {
1529 return ac.transport
1530 }
1531 return nil
1532}
1533
1534// tearDown starts to tear down the addrConn.
1535//
1536// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1537// will leak. In most cases, call cc.removeAddrConn() instead.
1538func (ac *addrConn) tearDown(err error) {
1539 ac.mu.Lock()
1540 if ac.state == connectivity.Shutdown {
1541 ac.mu.Unlock()
1542 return
1543 }
1544 curTr := ac.transport
1545 ac.transport = nil
1546 // We have to set the state to Shutdown before anything else to prevent races
1547 // between setting the state and logic that waits on context cancellation / etc.
1548 ac.updateConnectivityState(connectivity.Shutdown, nil)
1549 ac.cancel()
1550 ac.curAddr = resolver.Address{}
1551
1552 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
1553 Desc: "Subchannel deleted",
1554 Severity: channelz.CtInfo,
1555 Parent: &channelz.TraceEvent{
1556 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
1557 Severity: channelz.CtInfo,
1558 },
1559 })
1560 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
1561 // trace reference to the entity being deleted, and thus prevent it from
1562 // being deleted right away.
1563 channelz.RemoveEntry(ac.channelz.ID)
1564 ac.mu.Unlock()
1565
1566 // We have to release the lock before the call to GracefulClose/Close here
1567 // because both of them call onClose(), which requires locking ac.mu.
1568 if curTr != nil {
1569 if err == errConnDrain {
1570 // Close the transport gracefully when the subConn is being shutdown.
1571 //
1572 // GracefulClose() may be executed multiple times if:
1573 // - multiple GoAway frames are received from the server
1574 // - there are concurrent name resolver or balancer triggered
1575 // address removal and GoAway
1576 curTr.GracefulClose()
1577 } else {
1578 // Hard close the transport when the channel is entering idle or is
1579 // being shutdown. In the case where the channel is being shutdown,
1580 // closing of transports is also taken care of by cancellation of cc.ctx.
1581 // But in the case where the channel is entering idle, we need to
1582 // explicitly close the transports here. Instead of distinguishing
1583 // between these two cases, it is simpler to close the transport
1584 // unconditionally here.
1585 curTr.Close(err)
1586 }
1587 }
1588}
1589
1590type retryThrottler struct {
1591 max float64
1592 thresh float64
1593 ratio float64
1594
1595 mu sync.Mutex
1596 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1597}
1598
1599// throttle subtracts a retry token from the pool and returns whether a retry
1600// should be throttled (disallowed) based upon the retry throttling policy in
1601// the service config.
1602func (rt *retryThrottler) throttle() bool {
1603 if rt == nil {
1604 return false
1605 }
1606 rt.mu.Lock()
1607 defer rt.mu.Unlock()
1608 rt.tokens--
1609 if rt.tokens < 0 {
1610 rt.tokens = 0
1611 }
1612 return rt.tokens <= rt.thresh
1613}
1614
1615func (rt *retryThrottler) successfulRPC() {
1616 if rt == nil {
1617 return
1618 }
1619 rt.mu.Lock()
1620 defer rt.mu.Unlock()
1621 rt.tokens += rt.ratio
1622 if rt.tokens > rt.max {
1623 rt.tokens = rt.max
1624 }
1625}
1626
1627func (ac *addrConn) incrCallsStarted() {
1628 ac.channelz.ChannelMetrics.CallsStarted.Add(1)
1629 ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
1630}
1631
1632func (ac *addrConn) incrCallsSucceeded() {
1633 ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
1634}
1635
1636func (ac *addrConn) incrCallsFailed() {
1637 ac.channelz.ChannelMetrics.CallsFailed.Add(1)
1638}
1639
1640// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1641// underlying connections within the specified timeout.
1642//
1643// Deprecated: This error is never returned by grpc and should not be
1644// referenced by users.
1645var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1646
1647// getResolver finds the scheme in the cc's resolvers or the global registry.
1648// scheme should always be lowercase (typically by virtue of url.Parse()
1649// performing proper RFC3986 behavior).
1650func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1651 for _, rb := range cc.dopts.resolvers {
1652 if scheme == rb.Scheme() {
1653 return rb
1654 }
1655 }
1656 return resolver.Get(scheme)
1657}
1658
1659func (cc *ClientConn) updateConnectionError(err error) {
1660 cc.lceMu.Lock()
1661 cc.lastConnectionError = err
1662 cc.lceMu.Unlock()
1663}
1664
1665func (cc *ClientConn) connectionError() error {
1666 cc.lceMu.Lock()
1667 defer cc.lceMu.Unlock()
1668 return cc.lastConnectionError
1669}
1670
1671// initParsedTargetAndResolverBuilder parses the user's dial target and stores
1672// the parsed target in `cc.parsedTarget`.
1673//
1674// The resolver to use is determined based on the scheme in the parsed target
1675// and the same is stored in `cc.resolverBuilder`.
1676//
1677// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1678func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
1679 logger.Infof("original dial target is: %q", cc.target)
1680
1681 var rb resolver.Builder
1682 parsedTarget, err := parseTarget(cc.target)
1683 if err == nil {
1684 rb = cc.getResolver(parsedTarget.URL.Scheme)
1685 if rb != nil {
1686 cc.parsedTarget = parsedTarget
1687 cc.resolverBuilder = rb
1688 return nil
1689 }
1690 }
1691
1692 // We are here because the user's dial target did not contain a scheme or
1693 // specified an unregistered scheme. We should fallback to the default
1694 // scheme, except when a custom dialer is specified in which case, we should
1695 // always use passthrough scheme. For either case, we need to respect any overridden
1696 // global defaults set by the user.
1697 defScheme := cc.dopts.defaultScheme
1698 if internal.UserSetDefaultScheme {
1699 defScheme = resolver.GetDefaultScheme()
1700 }
1701
1702 canonicalTarget := defScheme + ":///" + cc.target
1703
1704 parsedTarget, err = parseTarget(canonicalTarget)
1705 if err != nil {
1706 return err
1707 }
1708 rb = cc.getResolver(parsedTarget.URL.Scheme)
1709 if rb == nil {
1710 return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
1711 }
1712 cc.parsedTarget = parsedTarget
1713 cc.resolverBuilder = rb
1714 return nil
1715}
1716
1717// parseTarget uses RFC 3986 semantics to parse the given target into a
1718// resolver.Target struct containing url. Query params are stripped from the
1719// endpoint.
1720func parseTarget(target string) (resolver.Target, error) {
1721 u, err := url.Parse(target)
1722 if err != nil {
1723 return resolver.Target{}, err
1724 }
1725
1726 return resolver.Target{URL: *u}, nil
1727}
1728
1729// encodeAuthority escapes the authority string based on valid chars defined in
1730// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
1731func encodeAuthority(authority string) string {
1732 const upperhex = "0123456789ABCDEF"
1733
1734 // Return for characters that must be escaped as per
1735 // Valid chars are mentioned here:
1736 // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2
1737 shouldEscape := func(c byte) bool {
1738 // Alphanum are always allowed.
1739 if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
1740 return false
1741 }
1742 switch c {
1743 case '-', '_', '.', '~': // Unreserved characters
1744 return false
1745 case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
1746 return false
1747 case ':', '[', ']', '@': // Authority related delimiters
1748 return false
1749 }
1750 // Everything else must be escaped.
1751 return true
1752 }
1753
1754 hexCount := 0
1755 for i := 0; i < len(authority); i++ {
1756 c := authority[i]
1757 if shouldEscape(c) {
1758 hexCount++
1759 }
1760 }
1761
1762 if hexCount == 0 {
1763 return authority
1764 }
1765
1766 required := len(authority) + 2*hexCount
1767 t := make([]byte, required)
1768
1769 j := 0
1770 // This logic is a barebones version of escape in the go net/url library.
1771 for i := 0; i < len(authority); i++ {
1772 switch c := authority[i]; {
1773 case shouldEscape(c):
1774 t[j] = '%'
1775 t[j+1] = upperhex[c>>4]
1776 t[j+2] = upperhex[c&15]
1777 j += 3
1778 default:
1779 t[j] = authority[i]
1780 j++
1781 }
1782 }
1783 return string(t)
1784}
1785
1786// Determine channel authority. The order of precedence is as follows:
1787// - user specified authority override using `WithAuthority` dial option
1788// - creds' notion of server name for the authentication handshake
1789// - endpoint from dial target of the form "scheme://[authority]/endpoint"
1790//
1791// Stores the determined authority in `cc.authority`.
1792//
1793// Returns a non-nil error if the authority returned by the transport
1794// credentials do not match the authority configured through the dial option.
1795//
1796// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1797func (cc *ClientConn) initAuthority() error {
1798 dopts := cc.dopts
1799 // Historically, we had two options for users to specify the serverName or
1800 // authority for a channel. One was through the transport credentials
1801 // (either in its constructor, or through the OverrideServerName() method).
1802 // The other option (for cases where WithInsecure() dial option was used)
1803 // was to use the WithAuthority() dial option.
1804 //
1805 // A few things have changed since:
1806 // - `insecure` package with an implementation of the `TransportCredentials`
1807 // interface for the insecure case
1808 // - WithAuthority() dial option support for secure credentials
1809 authorityFromCreds := ""
1810 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1811 authorityFromCreds = creds.Info().ServerName
1812 }
1813 authorityFromDialOption := dopts.authority
1814 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1815 return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1816 }
1817
1818 endpoint := cc.parsedTarget.Endpoint()
1819 if authorityFromDialOption != "" {
1820 cc.authority = authorityFromDialOption
1821 } else if authorityFromCreds != "" {
1822 cc.authority = authorityFromCreds
1823 } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
1824 cc.authority = auth.OverrideAuthority(cc.parsedTarget)
1825 } else if strings.HasPrefix(endpoint, ":") {
1826 cc.authority = "localhost" + endpoint
1827 } else {
1828 cc.authority = encodeAuthority(endpoint)
1829 }
1830 return nil
1831}