balancer_wrapper.go

  1/*
  2 *
  3 * Copyright 2017 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package grpc
 20
 21import (
 22	"context"
 23	"fmt"
 24	"sync"
 25
 26	"google.golang.org/grpc/balancer"
 27	"google.golang.org/grpc/codes"
 28	"google.golang.org/grpc/connectivity"
 29	"google.golang.org/grpc/experimental/stats"
 30	"google.golang.org/grpc/internal"
 31	"google.golang.org/grpc/internal/balancer/gracefulswitch"
 32	"google.golang.org/grpc/internal/channelz"
 33	"google.golang.org/grpc/internal/grpcsync"
 34	"google.golang.org/grpc/resolver"
 35	"google.golang.org/grpc/status"
 36)
 37
 38var (
 39	setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
 40	// noOpRegisterHealthListenerFn is used when client side health checking is
 41	// disabled. It sends a single READY update on the registered listener.
 42	noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
 43		listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
 44		return func() {}
 45	}
 46)
 47
 48// ccBalancerWrapper sits between the ClientConn and the Balancer.
 49//
 50// ccBalancerWrapper implements methods corresponding to the ones on the
 51// balancer.Balancer interface. The ClientConn is free to call these methods
 52// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
 53// to the Balancer happen in order by performing them in the serializer, without
 54// any mutexes held.
 55//
 56// ccBalancerWrapper also implements the balancer.ClientConn interface and is
 57// passed to the Balancer implementations. It invokes unexported methods on the
 58// ClientConn to handle these calls from the Balancer.
 59//
 60// It uses the gracefulswitch.Balancer internally to ensure that balancer
 61// switches happen in a graceful manner.
 62type ccBalancerWrapper struct {
 63	internal.EnforceClientConnEmbedding
 64	// The following fields are initialized when the wrapper is created and are
 65	// read-only afterwards, and therefore can be accessed without a mutex.
 66	cc               *ClientConn
 67	opts             balancer.BuildOptions
 68	serializer       *grpcsync.CallbackSerializer
 69	serializerCancel context.CancelFunc
 70
 71	// The following fields are only accessed within the serializer or during
 72	// initialization.
 73	curBalancerName string
 74	balancer        *gracefulswitch.Balancer
 75
 76	// The following field is protected by mu.  Caller must take cc.mu before
 77	// taking mu.
 78	mu     sync.Mutex
 79	closed bool
 80}
 81
 82// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
 83// underlying balancer is not created until the updateClientConnState() method
 84// is invoked.
 85func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
 86	ctx, cancel := context.WithCancel(cc.ctx)
 87	ccb := &ccBalancerWrapper{
 88		cc: cc,
 89		opts: balancer.BuildOptions{
 90			DialCreds:       cc.dopts.copts.TransportCredentials,
 91			CredsBundle:     cc.dopts.copts.CredsBundle,
 92			Dialer:          cc.dopts.copts.Dialer,
 93			Authority:       cc.authority,
 94			CustomUserAgent: cc.dopts.copts.UserAgent,
 95			ChannelzParent:  cc.channelz,
 96			Target:          cc.parsedTarget,
 97		},
 98		serializer:       grpcsync.NewCallbackSerializer(ctx),
 99		serializerCancel: cancel,
100	}
101	ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
102	return ccb
103}
104
105func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
106	return ccb.cc.metricsRecorderList
107}
108
109// updateClientConnState is invoked by grpc to push a ClientConnState update to
110// the underlying balancer.  This is always executed from the serializer, so
111// it is safe to call into the balancer here.
112func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
113	errCh := make(chan error)
114	uccs := func(ctx context.Context) {
115		defer close(errCh)
116		if ctx.Err() != nil || ccb.balancer == nil {
117			return
118		}
119		name := gracefulswitch.ChildName(ccs.BalancerConfig)
120		if ccb.curBalancerName != name {
121			ccb.curBalancerName = name
122			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
123		}
124		err := ccb.balancer.UpdateClientConnState(*ccs)
125		if logger.V(2) && err != nil {
126			logger.Infof("error from balancer.UpdateClientConnState: %v", err)
127		}
128		errCh <- err
129	}
130	onFailure := func() { close(errCh) }
131
132	// UpdateClientConnState can race with Close, and when the latter wins, the
133	// serializer is closed, and the attempt to schedule the callback will fail.
134	// It is acceptable to ignore this failure. But since we want to handle the
135	// state update in a blocking fashion (when we successfully schedule the
136	// callback), we have to use the ScheduleOr method and not the MaybeSchedule
137	// method on the serializer.
138	ccb.serializer.ScheduleOr(uccs, onFailure)
139	return <-errCh
140}
141
142// resolverError is invoked by grpc to push a resolver error to the underlying
143// balancer.  The call to the balancer is executed from the serializer.
144func (ccb *ccBalancerWrapper) resolverError(err error) {
145	ccb.serializer.TrySchedule(func(ctx context.Context) {
146		if ctx.Err() != nil || ccb.balancer == nil {
147			return
148		}
149		ccb.balancer.ResolverError(err)
150	})
151}
152
153// close initiates async shutdown of the wrapper.  cc.mu must be held when
154// calling this function.  To determine the wrapper has finished shutting down,
155// the channel should block on ccb.serializer.Done() without cc.mu held.
156func (ccb *ccBalancerWrapper) close() {
157	ccb.mu.Lock()
158	ccb.closed = true
159	ccb.mu.Unlock()
160	channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
161	ccb.serializer.TrySchedule(func(context.Context) {
162		if ccb.balancer == nil {
163			return
164		}
165		ccb.balancer.Close()
166		ccb.balancer = nil
167	})
168	ccb.serializerCancel()
169}
170
171// exitIdle invokes the balancer's exitIdle method in the serializer.
172func (ccb *ccBalancerWrapper) exitIdle() {
173	ccb.serializer.TrySchedule(func(ctx context.Context) {
174		if ctx.Err() != nil || ccb.balancer == nil {
175			return
176		}
177		ccb.balancer.ExitIdle()
178	})
179}
180
181func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
182	ccb.cc.mu.Lock()
183	defer ccb.cc.mu.Unlock()
184
185	ccb.mu.Lock()
186	if ccb.closed {
187		ccb.mu.Unlock()
188		return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
189	}
190	ccb.mu.Unlock()
191
192	if len(addrs) == 0 {
193		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
194	}
195	ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
196	if err != nil {
197		channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
198		return nil, err
199	}
200	acbw := &acBalancerWrapper{
201		ccb:           ccb,
202		ac:            ac,
203		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer),
204		stateListener: opts.StateListener,
205		healthData:    newHealthData(connectivity.Idle),
206	}
207	ac.acbw = acbw
208	return acbw, nil
209}
210
211func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) {
212	// The graceful switch balancer will never call this.
213	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
214}
215
216func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
217	acbw, ok := sc.(*acBalancerWrapper)
218	if !ok {
219		return
220	}
221	acbw.UpdateAddresses(addrs)
222}
223
224func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
225	ccb.cc.mu.Lock()
226	defer ccb.cc.mu.Unlock()
227	if ccb.cc.conns == nil {
228		// The CC has been closed; ignore this update.
229		return
230	}
231
232	ccb.mu.Lock()
233	if ccb.closed {
234		ccb.mu.Unlock()
235		return
236	}
237	ccb.mu.Unlock()
238	// Update picker before updating state.  Even though the ordering here does
239	// not matter, it can lead to multiple calls of Pick in the common start-up
240	// case where we wait for ready and then perform an RPC.  If the picker is
241	// updated later, we could call the "connecting" picker when the state is
242	// updated, and then call the "ready" picker after the picker gets updated.
243
244	// Note that there is no need to check if the balancer wrapper was closed,
245	// as we know the graceful switch LB policy will not call cc if it has been
246	// closed.
247	ccb.cc.pickerWrapper.updatePicker(s.Picker)
248	ccb.cc.csMgr.updateState(s.ConnectivityState)
249}
250
251func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
252	ccb.cc.mu.RLock()
253	defer ccb.cc.mu.RUnlock()
254
255	ccb.mu.Lock()
256	if ccb.closed {
257		ccb.mu.Unlock()
258		return
259	}
260	ccb.mu.Unlock()
261	ccb.cc.resolveNowLocked(o)
262}
263
264func (ccb *ccBalancerWrapper) Target() string {
265	return ccb.cc.target
266}
267
268// acBalancerWrapper is a wrapper on top of ac for balancers.
269// It implements balancer.SubConn interface.
270type acBalancerWrapper struct {
271	internal.EnforceSubConnEmbedding
272	ac            *addrConn          // read-only
273	ccb           *ccBalancerWrapper // read-only
274	stateListener func(balancer.SubConnState)
275
276	producersMu sync.Mutex
277	producers   map[balancer.ProducerBuilder]*refCountedProducer
278
279	// Access to healthData is protected by healthMu.
280	healthMu sync.Mutex
281	// healthData is stored as a pointer to detect when the health listener is
282	// dropped or updated. This is required as closures can't be compared for
283	// equality.
284	healthData *healthData
285}
286
287// healthData holds data related to health state reporting.
288type healthData struct {
289	// connectivityState stores the most recent connectivity state delivered
290	// to the LB policy. This is stored to avoid sending updates when the
291	// SubConn has already exited connectivity state READY.
292	connectivityState connectivity.State
293	// closeHealthProducer stores function to close the ref counted health
294	// producer. The health producer is automatically closed when the SubConn
295	// state changes.
296	closeHealthProducer func()
297}
298
299func newHealthData(s connectivity.State) *healthData {
300	return &healthData{
301		connectivityState:   s,
302		closeHealthProducer: func() {},
303	}
304}
305
306// updateState is invoked by grpc to push a subConn state update to the
307// underlying balancer.
308func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
309	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
310		if ctx.Err() != nil || acbw.ccb.balancer == nil {
311			return
312		}
313		// Invalidate all producers on any state change.
314		acbw.closeProducers()
315
316		// Even though it is optional for balancers, gracefulswitch ensures
317		// opts.StateListener is set, so this cannot ever be nil.
318		// TODO: delete this comment when UpdateSubConnState is removed.
319		scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err}
320		if s == connectivity.Ready {
321			setConnectedAddress(&scs, curAddr)
322		}
323		// Invalidate the health listener by updating the healthData.
324		acbw.healthMu.Lock()
325		// A race may occur if a health listener is registered soon after the
326		// connectivity state is set but before the stateListener is called.
327		// Two cases may arise:
328		// 1. The new state is not READY: RegisterHealthListener has checks to
329		//    ensure no updates are sent when the connectivity state is not
330		//    READY.
331		// 2. The new state is READY: This means that the old state wasn't Ready.
332		//    The RegisterHealthListener API mentions that a health listener
333		//    must not be registered when a SubConn is not ready to avoid such
334		//    races. When this happens, the LB policy would get health updates
335		//    on the old listener. When the LB policy registers a new listener
336		//    on receiving the connectivity update, the health updates will be
337		//    sent to the new health listener.
338		acbw.healthData = newHealthData(scs.ConnectivityState)
339		acbw.healthMu.Unlock()
340
341		acbw.stateListener(scs)
342	})
343}
344
345func (acbw *acBalancerWrapper) String() string {
346	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID)
347}
348
349func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
350	acbw.ac.updateAddrs(addrs)
351}
352
353func (acbw *acBalancerWrapper) Connect() {
354	go acbw.ac.connect()
355}
356
357func (acbw *acBalancerWrapper) Shutdown() {
358	acbw.closeProducers()
359	acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
360}
361
362// NewStream begins a streaming RPC on the addrConn.  If the addrConn is not
363// ready, blocks until it is or ctx expires.  Returns an error when the context
364// expires or the addrConn is shut down.
365func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
366	transport := acbw.ac.getReadyTransport()
367	if transport == nil {
368		return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
369
370	}
371	return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
372}
373
374// Invoke performs a unary RPC.  If the addrConn is not ready, returns
375// errSubConnNotReady.
376func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error {
377	cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
378	if err != nil {
379		return err
380	}
381	if err := cs.SendMsg(args); err != nil {
382		return err
383	}
384	return cs.RecvMsg(reply)
385}
386
387type refCountedProducer struct {
388	producer balancer.Producer
389	refs     int    // number of current refs to the producer
390	close    func() // underlying producer's close function
391}
392
393func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
394	acbw.producersMu.Lock()
395	defer acbw.producersMu.Unlock()
396
397	// Look up existing producer from this builder.
398	pData := acbw.producers[pb]
399	if pData == nil {
400		// Not found; create a new one and add it to the producers map.
401		p, closeFn := pb.Build(acbw)
402		pData = &refCountedProducer{producer: p, close: closeFn}
403		acbw.producers[pb] = pData
404	}
405	// Account for this new reference.
406	pData.refs++
407
408	// Return a cleanup function wrapped in a OnceFunc to remove this reference
409	// and delete the refCountedProducer from the map if the total reference
410	// count goes to zero.
411	unref := func() {
412		acbw.producersMu.Lock()
413		// If closeProducers has already closed this producer instance, refs is
414		// set to 0, so the check after decrementing will never pass, and the
415		// producer will not be double-closed.
416		pData.refs--
417		if pData.refs == 0 {
418			defer pData.close() // Run outside the acbw mutex
419			delete(acbw.producers, pb)
420		}
421		acbw.producersMu.Unlock()
422	}
423	return pData.producer, sync.OnceFunc(unref)
424}
425
426func (acbw *acBalancerWrapper) closeProducers() {
427	acbw.producersMu.Lock()
428	defer acbw.producersMu.Unlock()
429	for pb, pData := range acbw.producers {
430		pData.refs = 0
431		pData.close()
432		delete(acbw.producers, pb)
433	}
434}
435
436// healthProducerRegisterFn is a type alias for the health producer's function
437// for registering listeners.
438type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
439
440// healthListenerRegFn returns a function to register a listener for health
441// updates. If client side health checks are disabled, the registered listener
442// will get a single READY (raw connectivity state) update.
443//
444// Client side health checking is enabled when all the following
445// conditions are satisfied:
446// 1. Health checking is not disabled using the dial option.
447// 2. The health package is imported.
448// 3. The health check config is present in the service config.
449func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
450	if acbw.ccb.cc.dopts.disableHealthCheck {
451		return noOpRegisterHealthListenerFn
452	}
453	regHealthLisFn := internal.RegisterClientHealthCheckListener
454	if regHealthLisFn == nil {
455		// The health package is not imported.
456		return noOpRegisterHealthListenerFn
457	}
458	cfg := acbw.ac.cc.healthCheckConfig()
459	if cfg == nil {
460		return noOpRegisterHealthListenerFn
461	}
462	return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
463		return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
464	}
465}
466
467// RegisterHealthListener accepts a health listener from the LB policy. It sends
468// updates to the health listener as long as the SubConn's connectivity state
469// doesn't change and a new health listener is not registered. To invalidate
470// the currently registered health listener, acbw updates the healthData. If a
471// nil listener is registered, the active health listener is dropped.
472func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
473	acbw.healthMu.Lock()
474	defer acbw.healthMu.Unlock()
475	acbw.healthData.closeHealthProducer()
476	// listeners should not be registered when the connectivity state
477	// isn't Ready. This may happen when the balancer registers a listener
478	// after the connectivityState is updated, but before it is notified
479	// of the update.
480	if acbw.healthData.connectivityState != connectivity.Ready {
481		return
482	}
483	// Replace the health data to stop sending updates to any previously
484	// registered health listeners.
485	hd := newHealthData(connectivity.Ready)
486	acbw.healthData = hd
487	if listener == nil {
488		return
489	}
490
491	registerFn := acbw.healthListenerRegFn()
492	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
493		if ctx.Err() != nil || acbw.ccb.balancer == nil {
494			return
495		}
496		// Don't send updates if a new listener is registered.
497		acbw.healthMu.Lock()
498		defer acbw.healthMu.Unlock()
499		if acbw.healthData != hd {
500			return
501		}
502		// Serialize the health updates from the health producer with
503		// other calls into the LB policy.
504		listenerWrapper := func(scs balancer.SubConnState) {
505			acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
506				if ctx.Err() != nil || acbw.ccb.balancer == nil {
507					return
508				}
509				acbw.healthMu.Lock()
510				defer acbw.healthMu.Unlock()
511				if acbw.healthData != hd {
512					return
513				}
514				listener(scs)
515			})
516		}
517
518		hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
519	})
520}