pickfirstleaf.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19// Package pickfirstleaf contains the pick_first load balancing policy which
 20// will be the universal leaf policy after dualstack changes are implemented.
 21//
 22// # Experimental
 23//
 24// Notice: This package is EXPERIMENTAL and may be changed or removed in a
 25// later release.
 26package pickfirstleaf
 27
 28import (
 29	"encoding/json"
 30	"errors"
 31	"fmt"
 32	"net"
 33	"net/netip"
 34	"sync"
 35	"time"
 36
 37	"google.golang.org/grpc/balancer"
 38	"google.golang.org/grpc/balancer/pickfirst/internal"
 39	"google.golang.org/grpc/connectivity"
 40	expstats "google.golang.org/grpc/experimental/stats"
 41	"google.golang.org/grpc/grpclog"
 42	"google.golang.org/grpc/internal/envconfig"
 43	internalgrpclog "google.golang.org/grpc/internal/grpclog"
 44	"google.golang.org/grpc/internal/pretty"
 45	"google.golang.org/grpc/resolver"
 46	"google.golang.org/grpc/serviceconfig"
 47)
 48
 49func init() {
 50	if envconfig.NewPickFirstEnabled {
 51		// Register as the default pick_first balancer.
 52		Name = "pick_first"
 53	}
 54	balancer.Register(pickfirstBuilder{})
 55}
 56
 57type (
 58	// enableHealthListenerKeyType is a unique key type used in resolver
 59	// attributes to indicate whether the health listener usage is enabled.
 60	enableHealthListenerKeyType struct{}
 61	// managedByPickfirstKeyType is an attribute key type to inform Outlier
 62	// Detection that the generic health listener is being used.
 63	// TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when
 64	// implementing the dualstack design. This is a hack. Once Dualstack is
 65	// completed, outlier detection will stop sending ejection updates through
 66	// the connectivity listener.
 67	managedByPickfirstKeyType struct{}
 68)
 69
 70var (
 71	logger = grpclog.Component("pick-first-leaf-lb")
 72	// Name is the name of the pick_first_leaf balancer.
 73	// It is changed to "pick_first" in init() if this balancer is to be
 74	// registered as the default pickfirst.
 75	Name                 = "pick_first_leaf"
 76	disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 77		Name:        "grpc.lb.pick_first.disconnections",
 78		Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
 79		Unit:        "disconnection",
 80		Labels:      []string{"grpc.target"},
 81		Default:     false,
 82	})
 83	connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 84		Name:        "grpc.lb.pick_first.connection_attempts_succeeded",
 85		Description: "EXPERIMENTAL. Number of successful connection attempts.",
 86		Unit:        "attempt",
 87		Labels:      []string{"grpc.target"},
 88		Default:     false,
 89	})
 90	connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 91		Name:        "grpc.lb.pick_first.connection_attempts_failed",
 92		Description: "EXPERIMENTAL. Number of failed connection attempts.",
 93		Unit:        "attempt",
 94		Labels:      []string{"grpc.target"},
 95		Default:     false,
 96	})
 97)
 98
 99const (
100	// TODO: change to pick-first when this becomes the default pick_first policy.
101	logPrefix = "[pick-first-leaf-lb %p] "
102	// connectionDelayInterval is the time to wait for during the happy eyeballs
103	// pass before starting the next connection attempt.
104	connectionDelayInterval = 250 * time.Millisecond
105)
106
107type ipAddrFamily int
108
109const (
110	// ipAddrFamilyUnknown represents strings that can't be parsed as an IP
111	// address.
112	ipAddrFamilyUnknown ipAddrFamily = iota
113	ipAddrFamilyV4
114	ipAddrFamilyV6
115)
116
117type pickfirstBuilder struct{}
118
119func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
120	b := &pickfirstBalancer{
121		cc:              cc,
122		target:          bo.Target.String(),
123		metricsRecorder: cc.MetricsRecorder(),
124
125		subConns:              resolver.NewAddressMap(),
126		state:                 connectivity.Connecting,
127		cancelConnectionTimer: func() {},
128	}
129	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
130	return b
131}
132
133func (b pickfirstBuilder) Name() string {
134	return Name
135}
136
137func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
138	var cfg pfConfig
139	if err := json.Unmarshal(js, &cfg); err != nil {
140		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
141	}
142	return cfg, nil
143}
144
145// EnableHealthListener updates the state to configure pickfirst for using a
146// generic health listener.
147func EnableHealthListener(state resolver.State) resolver.State {
148	state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
149	return state
150}
151
152// IsManagedByPickfirst returns whether an address belongs to a SubConn
153// managed by the pickfirst LB policy.
154// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable
155// outlier_detection via the with connectivity listener when using pick_first.
156// Once Dualstack changes are complete, all SubConns will be created by
157// pick_first and outlier detection will only use the health listener for
158// ejection. This hack can then be removed.
159func IsManagedByPickfirst(addr resolver.Address) bool {
160	return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil
161}
162
163type pfConfig struct {
164	serviceconfig.LoadBalancingConfig `json:"-"`
165
166	// If set to true, instructs the LB policy to shuffle the order of the list
167	// of endpoints received from the name resolver before attempting to
168	// connect to them.
169	ShuffleAddressList bool `json:"shuffleAddressList"`
170}
171
172// scData keeps track of the current state of the subConn.
173// It is not safe for concurrent access.
174type scData struct {
175	// The following fields are initialized at build time and read-only after
176	// that.
177	subConn balancer.SubConn
178	addr    resolver.Address
179
180	rawConnectivityState connectivity.State
181	// The effective connectivity state based on raw connectivity, health state
182	// and after following sticky TransientFailure behaviour defined in A62.
183	effectiveState              connectivity.State
184	lastErr                     error
185	connectionFailedInFirstPass bool
186}
187
188func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
189	addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true)
190	sd := &scData{
191		rawConnectivityState: connectivity.Idle,
192		effectiveState:       connectivity.Idle,
193		addr:                 addr,
194	}
195	sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
196		StateListener: func(state balancer.SubConnState) {
197			b.updateSubConnState(sd, state)
198		},
199	})
200	if err != nil {
201		return nil, err
202	}
203	sd.subConn = sc
204	return sd, nil
205}
206
207type pickfirstBalancer struct {
208	// The following fields are initialized at build time and read-only after
209	// that and therefore do not need to be guarded by a mutex.
210	logger          *internalgrpclog.PrefixLogger
211	cc              balancer.ClientConn
212	target          string
213	metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil
214
215	// The mutex is used to ensure synchronization of updates triggered
216	// from the idle picker and the already serialized resolver,
217	// SubConn state updates.
218	mu sync.Mutex
219	// State reported to the channel based on SubConn states and resolver
220	// updates.
221	state connectivity.State
222	// scData for active subonns mapped by address.
223	subConns              *resolver.AddressMap
224	addressList           addressList
225	firstPass             bool
226	numTF                 int
227	cancelConnectionTimer func()
228	healthCheckingEnabled bool
229}
230
231// ResolverError is called by the ClientConn when the name resolver produces
232// an error or when pickfirst determined the resolver update to be invalid.
233func (b *pickfirstBalancer) ResolverError(err error) {
234	b.mu.Lock()
235	defer b.mu.Unlock()
236	b.resolverErrorLocked(err)
237}
238
239func (b *pickfirstBalancer) resolverErrorLocked(err error) {
240	if b.logger.V(2) {
241		b.logger.Infof("Received error from the name resolver: %v", err)
242	}
243
244	// The picker will not change since the balancer does not currently
245	// report an error. If the balancer hasn't received a single good resolver
246	// update yet, transition to TRANSIENT_FAILURE.
247	if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
248		if b.logger.V(2) {
249			b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
250		}
251		return
252	}
253
254	b.updateBalancerState(balancer.State{
255		ConnectivityState: connectivity.TransientFailure,
256		Picker:            &picker{err: fmt.Errorf("name resolver error: %v", err)},
257	})
258}
259
260func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
261	b.mu.Lock()
262	defer b.mu.Unlock()
263	b.cancelConnectionTimer()
264	if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
265		// Cleanup state pertaining to the previous resolver state.
266		// Treat an empty address list like an error by calling b.ResolverError.
267		b.closeSubConnsLocked()
268		b.addressList.updateAddrs(nil)
269		b.resolverErrorLocked(errors.New("produced zero addresses"))
270		return balancer.ErrBadResolverState
271	}
272	b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
273	cfg, ok := state.BalancerConfig.(pfConfig)
274	if state.BalancerConfig != nil && !ok {
275		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
276	}
277
278	if b.logger.V(2) {
279		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
280	}
281
282	var newAddrs []resolver.Address
283	if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
284		// Perform the optional shuffling described in gRFC A62. The shuffling
285		// will change the order of endpoints but not touch the order of the
286		// addresses within each endpoint. - A61
287		if cfg.ShuffleAddressList {
288			endpoints = append([]resolver.Endpoint{}, endpoints...)
289			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
290		}
291
292		// "Flatten the list by concatenating the ordered list of addresses for
293		// each of the endpoints, in order." - A61
294		for _, endpoint := range endpoints {
295			newAddrs = append(newAddrs, endpoint.Addresses...)
296		}
297	} else {
298		// Endpoints not set, process addresses until we migrate resolver
299		// emissions fully to Endpoints. The top channel does wrap emitted
300		// addresses with endpoints, however some balancers such as weighted
301		// target do not forward the corresponding correct endpoints down/split
302		// endpoints properly. Once all balancers correctly forward endpoints
303		// down, can delete this else conditional.
304		newAddrs = state.ResolverState.Addresses
305		if cfg.ShuffleAddressList {
306			newAddrs = append([]resolver.Address{}, newAddrs...)
307			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
308		}
309	}
310
311	// If an address appears in multiple endpoints or in the same endpoint
312	// multiple times, we keep it only once. We will create only one SubConn
313	// for the address because an AddressMap is used to store SubConns.
314	// Not de-duplicating would result in attempting to connect to the same
315	// SubConn multiple times in the same pass. We don't want this.
316	newAddrs = deDupAddresses(newAddrs)
317	newAddrs = interleaveAddresses(newAddrs)
318
319	prevAddr := b.addressList.currentAddress()
320	prevSCData, found := b.subConns.Get(prevAddr)
321	prevAddrsCount := b.addressList.size()
322	isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
323	b.addressList.updateAddrs(newAddrs)
324
325	// If the previous ready SubConn exists in new address list,
326	// keep this connection and don't create new SubConns.
327	if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
328		return nil
329	}
330
331	b.reconcileSubConnsLocked(newAddrs)
332	// If it's the first resolver update or the balancer was already READY
333	// (but the new address list does not contain the ready SubConn) or
334	// CONNECTING, enter CONNECTING.
335	// We may be in TRANSIENT_FAILURE due to a previous empty address list,
336	// we should still enter CONNECTING because the sticky TF behaviour
337	//  mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
338	// due to connectivity failures.
339	if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
340		// Start connection attempt at first address.
341		b.forceUpdateConcludedStateLocked(balancer.State{
342			ConnectivityState: connectivity.Connecting,
343			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
344		})
345		b.startFirstPassLocked()
346	} else if b.state == connectivity.TransientFailure {
347		// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
348		// we're READY. See A62.
349		b.startFirstPassLocked()
350	}
351	return nil
352}
353
354// UpdateSubConnState is unused as a StateListener is always registered when
355// creating SubConns.
356func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
357	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
358}
359
360func (b *pickfirstBalancer) Close() {
361	b.mu.Lock()
362	defer b.mu.Unlock()
363	b.closeSubConnsLocked()
364	b.cancelConnectionTimer()
365	b.state = connectivity.Shutdown
366}
367
368// ExitIdle moves the balancer out of idle state. It can be called concurrently
369// by the idlePicker and clientConn so access to variables should be
370// synchronized.
371func (b *pickfirstBalancer) ExitIdle() {
372	b.mu.Lock()
373	defer b.mu.Unlock()
374	if b.state == connectivity.Idle {
375		b.startFirstPassLocked()
376	}
377}
378
379func (b *pickfirstBalancer) startFirstPassLocked() {
380	b.firstPass = true
381	b.numTF = 0
382	// Reset the connection attempt record for existing SubConns.
383	for _, sd := range b.subConns.Values() {
384		sd.(*scData).connectionFailedInFirstPass = false
385	}
386	b.requestConnectionLocked()
387}
388
389func (b *pickfirstBalancer) closeSubConnsLocked() {
390	for _, sd := range b.subConns.Values() {
391		sd.(*scData).subConn.Shutdown()
392	}
393	b.subConns = resolver.NewAddressMap()
394}
395
396// deDupAddresses ensures that each address appears only once in the slice.
397func deDupAddresses(addrs []resolver.Address) []resolver.Address {
398	seenAddrs := resolver.NewAddressMap()
399	retAddrs := []resolver.Address{}
400
401	for _, addr := range addrs {
402		if _, ok := seenAddrs.Get(addr); ok {
403			continue
404		}
405		retAddrs = append(retAddrs, addr)
406	}
407	return retAddrs
408}
409
410// interleaveAddresses interleaves addresses of both families (IPv4 and IPv6)
411// as per RFC-8305 section 4.
412// Whichever address family is first in the list is followed by an address of
413// the other address family; that is, if the first address in the list is IPv6,
414// then the first IPv4 address should be moved up in the list to be second in
415// the list. It doesn't support configuring "First Address Family Count", i.e.
416// there will always be a single member of the first address family at the
417// beginning of the interleaved list.
418// Addresses that are neither IPv4 nor IPv6 are treated as part of a third
419// "unknown" family for interleaving.
420// See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6
421func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
422	familyAddrsMap := map[ipAddrFamily][]resolver.Address{}
423	interleavingOrder := []ipAddrFamily{}
424	for _, addr := range addrs {
425		family := addressFamily(addr.Addr)
426		if _, found := familyAddrsMap[family]; !found {
427			interleavingOrder = append(interleavingOrder, family)
428		}
429		familyAddrsMap[family] = append(familyAddrsMap[family], addr)
430	}
431
432	interleavedAddrs := make([]resolver.Address, 0, len(addrs))
433
434	for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) {
435		// Some IP types may have fewer addresses than others, so we look for
436		// the next type that has a remaining member to add to the interleaved
437		// list.
438		family := interleavingOrder[curFamilyIdx]
439		remainingMembers := familyAddrsMap[family]
440		if len(remainingMembers) > 0 {
441			interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
442			familyAddrsMap[family] = remainingMembers[1:]
443		}
444	}
445
446	return interleavedAddrs
447}
448
449// addressFamily returns the ipAddrFamily after parsing the address string.
450// If the address isn't of the format "ip-address:port", it returns
451// ipAddrFamilyUnknown. The address may be valid even if it's not an IP when
452// using a resolver like passthrough where the address may be a hostname in
453// some format that the dialer can resolve.
454func addressFamily(address string) ipAddrFamily {
455	// Parse the IP after removing the port.
456	host, _, err := net.SplitHostPort(address)
457	if err != nil {
458		return ipAddrFamilyUnknown
459	}
460	ip, err := netip.ParseAddr(host)
461	if err != nil {
462		return ipAddrFamilyUnknown
463	}
464	switch {
465	case ip.Is4() || ip.Is4In6():
466		return ipAddrFamilyV4
467	case ip.Is6():
468		return ipAddrFamilyV6
469	default:
470		return ipAddrFamilyUnknown
471	}
472}
473
474// reconcileSubConnsLocked updates the active subchannels based on a new address
475// list from the resolver. It does this by:
476//   - closing subchannels: any existing subchannels associated with addresses
477//     that are no longer in the updated list are shut down.
478//   - removing subchannels: entries for these closed subchannels are removed
479//     from the subchannel map.
480//
481// This ensures that the subchannel map accurately reflects the current set of
482// addresses received from the name resolver.
483func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
484	newAddrsMap := resolver.NewAddressMap()
485	for _, addr := range newAddrs {
486		newAddrsMap.Set(addr, true)
487	}
488
489	for _, oldAddr := range b.subConns.Keys() {
490		if _, ok := newAddrsMap.Get(oldAddr); ok {
491			continue
492		}
493		val, _ := b.subConns.Get(oldAddr)
494		val.(*scData).subConn.Shutdown()
495		b.subConns.Delete(oldAddr)
496	}
497}
498
499// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
500// becomes ready, which means that all other subConn must be shutdown.
501func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
502	b.cancelConnectionTimer()
503	for _, v := range b.subConns.Values() {
504		sd := v.(*scData)
505		if sd.subConn != selected.subConn {
506			sd.subConn.Shutdown()
507		}
508	}
509	b.subConns = resolver.NewAddressMap()
510	b.subConns.Set(selected.addr, selected)
511}
512
513// requestConnectionLocked starts connecting on the subchannel corresponding to
514// the current address. If no subchannel exists, one is created. If the current
515// subchannel is in TransientFailure, a connection to the next address is
516// attempted until a subchannel is found.
517func (b *pickfirstBalancer) requestConnectionLocked() {
518	if !b.addressList.isValid() {
519		return
520	}
521	var lastErr error
522	for valid := true; valid; valid = b.addressList.increment() {
523		curAddr := b.addressList.currentAddress()
524		sd, ok := b.subConns.Get(curAddr)
525		if !ok {
526			var err error
527			// We want to assign the new scData to sd from the outer scope,
528			// hence we can't use := below.
529			sd, err = b.newSCData(curAddr)
530			if err != nil {
531				// This should never happen, unless the clientConn is being shut
532				// down.
533				if b.logger.V(2) {
534					b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
535				}
536				// Do nothing, the LB policy will be closed soon.
537				return
538			}
539			b.subConns.Set(curAddr, sd)
540		}
541
542		scd := sd.(*scData)
543		switch scd.rawConnectivityState {
544		case connectivity.Idle:
545			scd.subConn.Connect()
546			b.scheduleNextConnectionLocked()
547			return
548		case connectivity.TransientFailure:
549			// The SubConn is being re-used and failed during a previous pass
550			// over the addressList. It has not completed backoff yet.
551			// Mark it as having failed and try the next address.
552			scd.connectionFailedInFirstPass = true
553			lastErr = scd.lastErr
554			continue
555		case connectivity.Connecting:
556			// Wait for the connection attempt to complete or the timer to fire
557			// before attempting the next address.
558			b.scheduleNextConnectionLocked()
559			return
560		default:
561			b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
562			return
563
564		}
565	}
566
567	// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
568	// first pass if possible.
569	b.endFirstPassIfPossibleLocked(lastErr)
570}
571
572func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
573	b.cancelConnectionTimer()
574	if !b.addressList.hasNext() {
575		return
576	}
577	curAddr := b.addressList.currentAddress()
578	cancelled := false // Access to this is protected by the balancer's mutex.
579	closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
580		b.mu.Lock()
581		defer b.mu.Unlock()
582		// If the scheduled task is cancelled while acquiring the mutex, return.
583		if cancelled {
584			return
585		}
586		if b.logger.V(2) {
587			b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
588		}
589		if b.addressList.increment() {
590			b.requestConnectionLocked()
591		}
592	})
593	// Access to the cancellation callback held by the balancer is guarded by
594	// the balancer's mutex, so it's safe to set the boolean from the callback.
595	b.cancelConnectionTimer = sync.OnceFunc(func() {
596		cancelled = true
597		closeFn()
598	})
599}
600
601func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
602	b.mu.Lock()
603	defer b.mu.Unlock()
604	oldState := sd.rawConnectivityState
605	sd.rawConnectivityState = newState.ConnectivityState
606	// Previously relevant SubConns can still callback with state updates.
607	// To prevent pickers from returning these obsolete SubConns, this logic
608	// is included to check if the current list of active SubConns includes this
609	// SubConn.
610	if !b.isActiveSCData(sd) {
611		return
612	}
613	if newState.ConnectivityState == connectivity.Shutdown {
614		sd.effectiveState = connectivity.Shutdown
615		return
616	}
617
618	// Record a connection attempt when exiting CONNECTING.
619	if newState.ConnectivityState == connectivity.TransientFailure {
620		sd.connectionFailedInFirstPass = true
621		connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
622	}
623
624	if newState.ConnectivityState == connectivity.Ready {
625		connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
626		b.shutdownRemainingLocked(sd)
627		if !b.addressList.seekTo(sd.addr) {
628			// This should not fail as we should have only one SubConn after
629			// entering READY. The SubConn should be present in the addressList.
630			b.logger.Errorf("Address %q not found address list in  %v", sd.addr, b.addressList.addresses)
631			return
632		}
633		if !b.healthCheckingEnabled {
634			if b.logger.V(2) {
635				b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
636			}
637
638			sd.effectiveState = connectivity.Ready
639			b.updateBalancerState(balancer.State{
640				ConnectivityState: connectivity.Ready,
641				Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
642			})
643			return
644		}
645		if b.logger.V(2) {
646			b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
647		}
648		// Send a CONNECTING update to take the SubConn out of sticky-TF if
649		// required.
650		sd.effectiveState = connectivity.Connecting
651		b.updateBalancerState(balancer.State{
652			ConnectivityState: connectivity.Connecting,
653			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
654		})
655		sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
656			b.updateSubConnHealthState(sd, scs)
657		})
658		return
659	}
660
661	// If the LB policy is READY, and it receives a subchannel state change,
662	// it means that the READY subchannel has failed.
663	// A SubConn can also transition from CONNECTING directly to IDLE when
664	// a transport is successfully created, but the connection fails
665	// before the SubConn can send the notification for READY. We treat
666	// this as a successful connection and transition to IDLE.
667	// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
668	// part of the if condition below once the issue is fixed.
669	if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
670		// Once a transport fails, the balancer enters IDLE and starts from
671		// the first address when the picker is used.
672		b.shutdownRemainingLocked(sd)
673		sd.effectiveState = newState.ConnectivityState
674		// READY SubConn interspliced in between CONNECTING and IDLE, need to
675		// account for that.
676		if oldState == connectivity.Connecting {
677			// A known issue (https://github.com/grpc/grpc-go/issues/7862)
678			// causes a race that prevents the READY state change notification.
679			// This works around it.
680			connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
681		}
682		disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
683		b.addressList.reset()
684		b.updateBalancerState(balancer.State{
685			ConnectivityState: connectivity.Idle,
686			Picker:            &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
687		})
688		return
689	}
690
691	if b.firstPass {
692		switch newState.ConnectivityState {
693		case connectivity.Connecting:
694			// The effective state can be in either IDLE, CONNECTING or
695			// TRANSIENT_FAILURE. If it's  TRANSIENT_FAILURE, stay in
696			// TRANSIENT_FAILURE until it's READY. See A62.
697			if sd.effectiveState != connectivity.TransientFailure {
698				sd.effectiveState = connectivity.Connecting
699				b.updateBalancerState(balancer.State{
700					ConnectivityState: connectivity.Connecting,
701					Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
702				})
703			}
704		case connectivity.TransientFailure:
705			sd.lastErr = newState.ConnectionError
706			sd.effectiveState = connectivity.TransientFailure
707			// Since we're re-using common SubConns while handling resolver
708			// updates, we could receive an out of turn TRANSIENT_FAILURE from
709			// a pass over the previous address list. Happy Eyeballs will also
710			// cause out of order updates to arrive.
711
712			if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
713				b.cancelConnectionTimer()
714				if b.addressList.increment() {
715					b.requestConnectionLocked()
716					return
717				}
718			}
719
720			// End the first pass if we've seen a TRANSIENT_FAILURE from all
721			// SubConns once.
722			b.endFirstPassIfPossibleLocked(newState.ConnectionError)
723		}
724		return
725	}
726
727	// We have finished the first pass, keep re-connecting failing SubConns.
728	switch newState.ConnectivityState {
729	case connectivity.TransientFailure:
730		b.numTF = (b.numTF + 1) % b.subConns.Len()
731		sd.lastErr = newState.ConnectionError
732		if b.numTF%b.subConns.Len() == 0 {
733			b.updateBalancerState(balancer.State{
734				ConnectivityState: connectivity.TransientFailure,
735				Picker:            &picker{err: newState.ConnectionError},
736			})
737		}
738		// We don't need to request re-resolution since the SubConn already
739		// does that before reporting TRANSIENT_FAILURE.
740		// TODO: #7534 - Move re-resolution requests from SubConn into
741		// pick_first.
742	case connectivity.Idle:
743		sd.subConn.Connect()
744	}
745}
746
747// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
748// addresses are tried and their SubConns have reported a failure.
749func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
750	// An optimization to avoid iterating over the entire SubConn map.
751	if b.addressList.isValid() {
752		return
753	}
754	// Connect() has been called on all the SubConns. The first pass can be
755	// ended if all the SubConns have reported a failure.
756	for _, v := range b.subConns.Values() {
757		sd := v.(*scData)
758		if !sd.connectionFailedInFirstPass {
759			return
760		}
761	}
762	b.firstPass = false
763	b.updateBalancerState(balancer.State{
764		ConnectivityState: connectivity.TransientFailure,
765		Picker:            &picker{err: lastErr},
766	})
767	// Start re-connecting all the SubConns that are already in IDLE.
768	for _, v := range b.subConns.Values() {
769		sd := v.(*scData)
770		if sd.rawConnectivityState == connectivity.Idle {
771			sd.subConn.Connect()
772		}
773	}
774}
775
776func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
777	activeSD, found := b.subConns.Get(sd.addr)
778	return found && activeSD == sd
779}
780
781func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
782	b.mu.Lock()
783	defer b.mu.Unlock()
784	// Previously relevant SubConns can still callback with state updates.
785	// To prevent pickers from returning these obsolete SubConns, this logic
786	// is included to check if the current list of active SubConns includes
787	// this SubConn.
788	if !b.isActiveSCData(sd) {
789		return
790	}
791	sd.effectiveState = state.ConnectivityState
792	switch state.ConnectivityState {
793	case connectivity.Ready:
794		b.updateBalancerState(balancer.State{
795			ConnectivityState: connectivity.Ready,
796			Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
797		})
798	case connectivity.TransientFailure:
799		b.updateBalancerState(balancer.State{
800			ConnectivityState: connectivity.TransientFailure,
801			Picker:            &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
802		})
803	case connectivity.Connecting:
804		b.updateBalancerState(balancer.State{
805			ConnectivityState: connectivity.Connecting,
806			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
807		})
808	default:
809		b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
810	}
811}
812
813// updateBalancerState stores the state reported to the channel and calls
814// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
815// updates to the channel.
816func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
817	// In case of TransientFailures allow the picker to be updated to update
818	// the connectivity error, in all other cases don't send duplicate state
819	// updates.
820	if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
821		return
822	}
823	b.forceUpdateConcludedStateLocked(newState)
824}
825
826// forceUpdateConcludedStateLocked stores the state reported to the channel and
827// calls ClientConn.UpdateState().
828// A separate function is defined to force update the ClientConn state since the
829// channel doesn't correctly assume that LB policies start in CONNECTING and
830// relies on LB policy to send an initial CONNECTING update.
831func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
832	b.state = newState.ConnectivityState
833	b.cc.UpdateState(newState)
834}
835
836type picker struct {
837	result balancer.PickResult
838	err    error
839}
840
841func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
842	return p.result, p.err
843}
844
845// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
846// CONNECTING when Pick is called.
847type idlePicker struct {
848	exitIdle func()
849}
850
851func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
852	i.exitIdle()
853	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
854}
855
856// addressList manages sequentially iterating over addresses present in a list
857// of endpoints. It provides a 1 dimensional view of the addresses present in
858// the endpoints.
859// This type is not safe for concurrent access.
860type addressList struct {
861	addresses []resolver.Address
862	idx       int
863}
864
865func (al *addressList) isValid() bool {
866	return al.idx < len(al.addresses)
867}
868
869func (al *addressList) size() int {
870	return len(al.addresses)
871}
872
873// increment moves to the next index in the address list.
874// This method returns false if it went off the list, true otherwise.
875func (al *addressList) increment() bool {
876	if !al.isValid() {
877		return false
878	}
879	al.idx++
880	return al.idx < len(al.addresses)
881}
882
883// currentAddress returns the current address pointed to in the addressList.
884// If the list is in an invalid state, it returns an empty address instead.
885func (al *addressList) currentAddress() resolver.Address {
886	if !al.isValid() {
887		return resolver.Address{}
888	}
889	return al.addresses[al.idx]
890}
891
892func (al *addressList) reset() {
893	al.idx = 0
894}
895
896func (al *addressList) updateAddrs(addrs []resolver.Address) {
897	al.addresses = addrs
898	al.reset()
899}
900
901// seekTo returns false if the needle was not found and the current index was
902// left unchanged.
903func (al *addressList) seekTo(needle resolver.Address) bool {
904	for ai, addr := range al.addresses {
905		if !equalAddressIgnoringBalAttributes(&addr, &needle) {
906			continue
907		}
908		al.idx = ai
909		return true
910	}
911	return false
912}
913
914// hasNext returns whether incrementing the addressList will result in moving
915// past the end of the list. If the list has already moved past the end, it
916// returns false.
917func (al *addressList) hasNext() bool {
918	if !al.isValid() {
919		return false
920	}
921	return al.idx+1 < len(al.addresses)
922}
923
924// equalAddressIgnoringBalAttributes returns true is a and b are considered
925// equal. This is different from the Equal method on the resolver.Address type
926// which considers all fields to determine equality. Here, we only consider
927// fields that are meaningful to the SubConn.
928func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
929	return a.Addr == b.Addr && a.ServerName == b.ServerName &&
930		a.Attributes.Equal(b.Attributes) &&
931		a.Metadata == b.Metadata
932}