balancer.go

  1/*
  2 *
  3 * Copyright 2017 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package base
 20
 21import (
 22	"errors"
 23	"fmt"
 24
 25	"google.golang.org/grpc/balancer"
 26	"google.golang.org/grpc/connectivity"
 27	"google.golang.org/grpc/grpclog"
 28	"google.golang.org/grpc/resolver"
 29)
 30
 31var logger = grpclog.Component("balancer")
 32
 33type baseBuilder struct {
 34	name          string
 35	pickerBuilder PickerBuilder
 36	config        Config
 37}
 38
 39func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
 40	bal := &baseBalancer{
 41		cc:            cc,
 42		pickerBuilder: bb.pickerBuilder,
 43
 44		subConns: resolver.NewAddressMap(),
 45		scStates: make(map[balancer.SubConn]connectivity.State),
 46		csEvltr:  &balancer.ConnectivityStateEvaluator{},
 47		config:   bb.config,
 48		state:    connectivity.Connecting,
 49	}
 50	// Initialize picker to a picker that always returns
 51	// ErrNoSubConnAvailable, because when state of a SubConn changes, we
 52	// may call UpdateState with this picker.
 53	bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
 54	return bal
 55}
 56
 57func (bb *baseBuilder) Name() string {
 58	return bb.name
 59}
 60
 61type baseBalancer struct {
 62	cc            balancer.ClientConn
 63	pickerBuilder PickerBuilder
 64
 65	csEvltr *balancer.ConnectivityStateEvaluator
 66	state   connectivity.State
 67
 68	subConns *resolver.AddressMap
 69	scStates map[balancer.SubConn]connectivity.State
 70	picker   balancer.Picker
 71	config   Config
 72
 73	resolverErr error // the last error reported by the resolver; cleared on successful resolution
 74	connErr     error // the last connection error; cleared upon leaving TransientFailure
 75}
 76
 77func (b *baseBalancer) ResolverError(err error) {
 78	b.resolverErr = err
 79	if b.subConns.Len() == 0 {
 80		b.state = connectivity.TransientFailure
 81	}
 82
 83	if b.state != connectivity.TransientFailure {
 84		// The picker will not change since the balancer does not currently
 85		// report an error.
 86		return
 87	}
 88	b.regeneratePicker()
 89	b.cc.UpdateState(balancer.State{
 90		ConnectivityState: b.state,
 91		Picker:            b.picker,
 92	})
 93}
 94
 95func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
 96	// TODO: handle s.ResolverState.ServiceConfig?
 97	if logger.V(2) {
 98		logger.Info("base.baseBalancer: got new ClientConn state: ", s)
 99	}
100	// Successful resolution; clear resolver error and ensure we return nil.
101	b.resolverErr = nil
102	// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
103	addrsSet := resolver.NewAddressMap()
104	for _, a := range s.ResolverState.Addresses {
105		addrsSet.Set(a, nil)
106		if _, ok := b.subConns.Get(a); !ok {
107			// a is a new address (not existing in b.subConns).
108			var sc balancer.SubConn
109			opts := balancer.NewSubConnOptions{
110				HealthCheckEnabled: b.config.HealthCheck,
111				StateListener:      func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
112			}
113			sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
114			if err != nil {
115				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
116				continue
117			}
118			b.subConns.Set(a, sc)
119			b.scStates[sc] = connectivity.Idle
120			b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
121			sc.Connect()
122		}
123	}
124	for _, a := range b.subConns.Keys() {
125		sci, _ := b.subConns.Get(a)
126		sc := sci.(balancer.SubConn)
127		// a was removed by resolver.
128		if _, ok := addrsSet.Get(a); !ok {
129			sc.Shutdown()
130			b.subConns.Delete(a)
131			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
132			// The entry will be deleted in updateSubConnState.
133		}
134	}
135	// If resolver state contains no addresses, return an error so ClientConn
136	// will trigger re-resolve. Also records this as a resolver error, so when
137	// the overall state turns transient failure, the error message will have
138	// the zero address information.
139	if len(s.ResolverState.Addresses) == 0 {
140		b.ResolverError(errors.New("produced zero addresses"))
141		return balancer.ErrBadResolverState
142	}
143
144	b.regeneratePicker()
145	b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
146	return nil
147}
148
149// mergeErrors builds an error from the last connection error and the last
150// resolver error.  Must only be called if b.state is TransientFailure.
151func (b *baseBalancer) mergeErrors() error {
152	// connErr must always be non-nil unless there are no SubConns, in which
153	// case resolverErr must be non-nil.
154	if b.connErr == nil {
155		return fmt.Errorf("last resolver error: %v", b.resolverErr)
156	}
157	if b.resolverErr == nil {
158		return fmt.Errorf("last connection error: %v", b.connErr)
159	}
160	return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
161}
162
163// regeneratePicker takes a snapshot of the balancer, and generates a picker
164// from it. The picker is
165//   - errPicker if the balancer is in TransientFailure,
166//   - built by the pickerBuilder with all READY SubConns otherwise.
167func (b *baseBalancer) regeneratePicker() {
168	if b.state == connectivity.TransientFailure {
169		b.picker = NewErrPicker(b.mergeErrors())
170		return
171	}
172	readySCs := make(map[balancer.SubConn]SubConnInfo)
173
174	// Filter out all ready SCs from full subConn map.
175	for _, addr := range b.subConns.Keys() {
176		sci, _ := b.subConns.Get(addr)
177		sc := sci.(balancer.SubConn)
178		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
179			readySCs[sc] = SubConnInfo{Address: addr}
180		}
181	}
182	b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
183}
184
185// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
186func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
187	logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
188}
189
190func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
191	s := state.ConnectivityState
192	if logger.V(2) {
193		logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
194	}
195	oldS, ok := b.scStates[sc]
196	if !ok {
197		if logger.V(2) {
198			logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
199		}
200		return
201	}
202	if oldS == connectivity.TransientFailure &&
203		(s == connectivity.Connecting || s == connectivity.Idle) {
204		// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
205		// CONNECTING transitions to prevent the aggregated state from being
206		// always CONNECTING when many backends exist but are all down.
207		if s == connectivity.Idle {
208			sc.Connect()
209		}
210		return
211	}
212	b.scStates[sc] = s
213	switch s {
214	case connectivity.Idle:
215		sc.Connect()
216	case connectivity.Shutdown:
217		// When an address was removed by resolver, b called Shutdown but kept
218		// the sc's state in scStates. Remove state for this sc here.
219		delete(b.scStates, sc)
220	case connectivity.TransientFailure:
221		// Save error to be reported via picker.
222		b.connErr = state.ConnectionError
223	}
224
225	b.state = b.csEvltr.RecordTransition(oldS, s)
226
227	// Regenerate picker when one of the following happens:
228	//  - this sc entered or left ready
229	//  - the aggregated state of balancer is TransientFailure
230	//    (may need to update error message)
231	if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
232		b.state == connectivity.TransientFailure {
233		b.regeneratePicker()
234	}
235	b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
236}
237
238// Close is a nop because base balancer doesn't have internal state to clean up,
239// and it doesn't need to call Shutdown for the SubConns.
240func (b *baseBalancer) Close() {
241}
242
243// ExitIdle is a nop because the base balancer attempts to stay connected to
244// all SubConns at all times.
245func (b *baseBalancer) ExitIdle() {
246}
247
248// NewErrPicker returns a Picker that always returns err on Pick().
249func NewErrPicker(err error) balancer.Picker {
250	return &errPicker{err: err}
251}
252
253// NewErrPickerV2 is temporarily defined for backward compatibility reasons.
254//
255// Deprecated: use NewErrPicker instead.
256var NewErrPickerV2 = NewErrPicker
257
258type errPicker struct {
259	err error // Pick() always returns this err.
260}
261
262func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
263	return balancer.PickResult{}, p.err
264}