gracefulswitch.go

  1/*
  2 *
  3 * Copyright 2022 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19// Package gracefulswitch implements a graceful switch load balancer.
 20package gracefulswitch
 21
 22import (
 23	"errors"
 24	"fmt"
 25	"sync"
 26
 27	"google.golang.org/grpc/balancer"
 28	"google.golang.org/grpc/balancer/base"
 29	"google.golang.org/grpc/connectivity"
 30	"google.golang.org/grpc/resolver"
 31)
 32
 33var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
 34var _ balancer.Balancer = (*Balancer)(nil)
 35
 36// NewBalancer returns a graceful switch Balancer.
 37func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
 38	return &Balancer{
 39		cc:    cc,
 40		bOpts: opts,
 41	}
 42}
 43
 44// Balancer is a utility to gracefully switch from one balancer to
 45// a new balancer. It implements the balancer.Balancer interface.
 46type Balancer struct {
 47	bOpts balancer.BuildOptions
 48	cc    balancer.ClientConn
 49
 50	// mu protects the following fields and all fields within balancerCurrent
 51	// and balancerPending. mu does not need to be held when calling into the
 52	// child balancers, as all calls into these children happen only as a direct
 53	// result of a call into the gracefulSwitchBalancer, which are also
 54	// guaranteed to be synchronous. There is one exception: an UpdateState call
 55	// from a child balancer when current and pending are populated can lead to
 56	// calling Close() on the current. To prevent that racing with an
 57	// UpdateSubConnState from the channel, we hold currentMu during Close and
 58	// UpdateSubConnState calls.
 59	mu              sync.Mutex
 60	balancerCurrent *balancerWrapper
 61	balancerPending *balancerWrapper
 62	closed          bool // set to true when this balancer is closed
 63
 64	// currentMu must be locked before mu. This mutex guards against this
 65	// sequence of events: UpdateSubConnState() called, finds the
 66	// balancerCurrent, gives up lock, updateState comes in, causes Close() on
 67	// balancerCurrent before the UpdateSubConnState is called on the
 68	// balancerCurrent.
 69	currentMu sync.Mutex
 70}
 71
 72// swap swaps out the current lb with the pending lb and updates the ClientConn.
 73// The caller must hold gsb.mu.
 74func (gsb *Balancer) swap() {
 75	gsb.cc.UpdateState(gsb.balancerPending.lastState)
 76	cur := gsb.balancerCurrent
 77	gsb.balancerCurrent = gsb.balancerPending
 78	gsb.balancerPending = nil
 79	go func() {
 80		gsb.currentMu.Lock()
 81		defer gsb.currentMu.Unlock()
 82		cur.Close()
 83	}()
 84}
 85
 86// Helper function that checks if the balancer passed in is current or pending.
 87// The caller must hold gsb.mu.
 88func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
 89	return bw == gsb.balancerCurrent || bw == gsb.balancerPending
 90}
 91
 92// SwitchTo initializes the graceful switch process, which completes based on
 93// connectivity state changes on the current/pending balancer. Thus, the switch
 94// process is not complete when this method returns. This method must be called
 95// synchronously alongside the rest of the balancer.Balancer methods this
 96// Graceful Switch Balancer implements.
 97//
 98// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
 99// to cause the Balancer to automatically change to the new child when necessary.
100func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
101	_, err := gsb.switchTo(builder)
102	return err
103}
104
105func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
106	gsb.mu.Lock()
107	if gsb.closed {
108		gsb.mu.Unlock()
109		return nil, errBalancerClosed
110	}
111	bw := &balancerWrapper{
112		ClientConn: gsb.cc,
113		builder:    builder,
114		gsb:        gsb,
115		lastState: balancer.State{
116			ConnectivityState: connectivity.Connecting,
117			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
118		},
119		subconns: make(map[balancer.SubConn]bool),
120	}
121	balToClose := gsb.balancerPending // nil if there is no pending balancer
122	if gsb.balancerCurrent == nil {
123		gsb.balancerCurrent = bw
124	} else {
125		gsb.balancerPending = bw
126	}
127	gsb.mu.Unlock()
128	balToClose.Close()
129	// This function takes a builder instead of a balancer because builder.Build
130	// can call back inline, and this utility needs to handle the callbacks.
131	newBalancer := builder.Build(bw, gsb.bOpts)
132	if newBalancer == nil {
133		// This is illegal and should never happen; we clear the balancerWrapper
134		// we were constructing if it happens to avoid a potential panic.
135		gsb.mu.Lock()
136		if gsb.balancerPending != nil {
137			gsb.balancerPending = nil
138		} else {
139			gsb.balancerCurrent = nil
140		}
141		gsb.mu.Unlock()
142		return nil, balancer.ErrBadResolverState
143	}
144
145	// This write doesn't need to take gsb.mu because this field never gets read
146	// or written to on any calls from the current or pending. Calls from grpc
147	// to this balancer are guaranteed to be called synchronously, so this
148	// bw.Balancer field will never be forwarded to until this SwitchTo()
149	// function returns.
150	bw.Balancer = newBalancer
151	return bw, nil
152}
153
154// Returns nil if the graceful switch balancer is closed.
155func (gsb *Balancer) latestBalancer() *balancerWrapper {
156	gsb.mu.Lock()
157	defer gsb.mu.Unlock()
158	if gsb.balancerPending != nil {
159		return gsb.balancerPending
160	}
161	return gsb.balancerCurrent
162}
163
164// UpdateClientConnState forwards the update to the latest balancer created.
165//
166// If the state's BalancerConfig is the config returned by a call to
167// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
168// the balancer indicated by the config before forwarding its config to it, if
169// necessary.
170func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
171	// The resolver data is only relevant to the most recent LB Policy.
172	balToUpdate := gsb.latestBalancer()
173	gsbCfg, ok := state.BalancerConfig.(*lbConfig)
174	if ok {
175		// Switch to the child in the config unless it is already active.
176		if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
177			var err error
178			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
179			if err != nil {
180				return fmt.Errorf("could not switch to new child balancer: %w", err)
181			}
182		}
183		// Unwrap the child balancer's config.
184		state.BalancerConfig = gsbCfg.childConfig
185	}
186
187	if balToUpdate == nil {
188		return errBalancerClosed
189	}
190
191	// Perform this call without gsb.mu to prevent deadlocks if the child calls
192	// back into the channel. The latest balancer can never be closed during a
193	// call from the channel, even without gsb.mu held.
194	return balToUpdate.UpdateClientConnState(state)
195}
196
197// ResolverError forwards the error to the latest balancer created.
198func (gsb *Balancer) ResolverError(err error) {
199	// The resolver data is only relevant to the most recent LB Policy.
200	balToUpdate := gsb.latestBalancer()
201	if balToUpdate == nil {
202		gsb.cc.UpdateState(balancer.State{
203			ConnectivityState: connectivity.TransientFailure,
204			Picker:            base.NewErrPicker(err),
205		})
206		return
207	}
208	// Perform this call without gsb.mu to prevent deadlocks if the child calls
209	// back into the channel. The latest balancer can never be closed during a
210	// call from the channel, even without gsb.mu held.
211	balToUpdate.ResolverError(err)
212}
213
214// ExitIdle forwards the call to the latest balancer created.
215//
216// If the latest balancer does not support ExitIdle, the subConns are
217// re-connected to manually.
218func (gsb *Balancer) ExitIdle() {
219	balToUpdate := gsb.latestBalancer()
220	if balToUpdate == nil {
221		return
222	}
223	// There is no need to protect this read with a mutex, as the write to the
224	// Balancer field happens in SwitchTo, which completes before this can be
225	// called.
226	if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
227		ei.ExitIdle()
228		return
229	}
230	gsb.mu.Lock()
231	defer gsb.mu.Unlock()
232	for sc := range balToUpdate.subconns {
233		sc.Connect()
234	}
235}
236
237// updateSubConnState forwards the update to the appropriate child.
238func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
239	gsb.currentMu.Lock()
240	defer gsb.currentMu.Unlock()
241	gsb.mu.Lock()
242	// Forward update to the appropriate child.  Even if there is a pending
243	// balancer, the current balancer should continue to get SubConn updates to
244	// maintain the proper state while the pending is still connecting.
245	var balToUpdate *balancerWrapper
246	if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
247		balToUpdate = gsb.balancerCurrent
248	} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
249		balToUpdate = gsb.balancerPending
250	}
251	if balToUpdate == nil {
252		// SubConn belonged to a stale lb policy that has not yet fully closed,
253		// or the balancer was already closed.
254		gsb.mu.Unlock()
255		return
256	}
257	if state.ConnectivityState == connectivity.Shutdown {
258		delete(balToUpdate.subconns, sc)
259	}
260	gsb.mu.Unlock()
261	if cb != nil {
262		cb(state)
263	} else {
264		balToUpdate.UpdateSubConnState(sc, state)
265	}
266}
267
268// UpdateSubConnState forwards the update to the appropriate child.
269func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
270	gsb.updateSubConnState(sc, state, nil)
271}
272
273// Close closes any active child balancers.
274func (gsb *Balancer) Close() {
275	gsb.mu.Lock()
276	gsb.closed = true
277	currentBalancerToClose := gsb.balancerCurrent
278	gsb.balancerCurrent = nil
279	pendingBalancerToClose := gsb.balancerPending
280	gsb.balancerPending = nil
281	gsb.mu.Unlock()
282
283	currentBalancerToClose.Close()
284	pendingBalancerToClose.Close()
285}
286
287// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
288// methods to help cleanup SubConns created by the wrapped balancer.
289//
290// It implements the balancer.ClientConn interface and is passed down in that
291// capacity to the wrapped balancer. It maintains a set of subConns created by
292// the wrapped balancer and calls from the latter to create/update/shutdown
293// SubConns update this set before being forwarded to the parent ClientConn.
294// State updates from the wrapped balancer can result in invocation of the
295// graceful switch logic.
296type balancerWrapper struct {
297	balancer.ClientConn
298	balancer.Balancer
299	gsb     *Balancer
300	builder balancer.Builder
301
302	lastState balancer.State
303	subconns  map[balancer.SubConn]bool // subconns created by this balancer
304}
305
306// Close closes the underlying LB policy and shuts down the subconns it
307// created. bw must not be referenced via balancerCurrent or balancerPending in
308// gsb when called. gsb.mu must not be held.  Does not panic with a nil
309// receiver.
310func (bw *balancerWrapper) Close() {
311	// before Close is called.
312	if bw == nil {
313		return
314	}
315	// There is no need to protect this read with a mutex, as Close() is
316	// impossible to be called concurrently with the write in SwitchTo(). The
317	// callsites of Close() for this balancer in Graceful Switch Balancer will
318	// never be called until SwitchTo() returns.
319	bw.Balancer.Close()
320	bw.gsb.mu.Lock()
321	for sc := range bw.subconns {
322		sc.Shutdown()
323	}
324	bw.gsb.mu.Unlock()
325}
326
327func (bw *balancerWrapper) UpdateState(state balancer.State) {
328	// Hold the mutex for this entire call to ensure it cannot occur
329	// concurrently with other updateState() calls. This causes updates to
330	// lastState and calls to cc.UpdateState to happen atomically.
331	bw.gsb.mu.Lock()
332	defer bw.gsb.mu.Unlock()
333	bw.lastState = state
334
335	if !bw.gsb.balancerCurrentOrPending(bw) {
336		return
337	}
338
339	if bw == bw.gsb.balancerCurrent {
340		// In the case that the current balancer exits READY, and there is a pending
341		// balancer, you can forward the pending balancer's cached State up to
342		// ClientConn and swap the pending into the current. This is because there
343		// is no reason to gracefully switch from and keep using the old policy as
344		// the ClientConn is not connected to any backends.
345		if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
346			bw.gsb.swap()
347			return
348		}
349		// Even if there is a pending balancer waiting to be gracefully switched to,
350		// continue to forward current balancer updates to the Client Conn. Ignoring
351		// state + picker from the current would cause undefined behavior/cause the
352		// system to behave incorrectly from the current LB policies perspective.
353		// Also, the current LB is still being used by grpc to choose SubConns per
354		// RPC, and thus should use the most updated form of the current balancer.
355		bw.gsb.cc.UpdateState(state)
356		return
357	}
358	// This method is now dealing with a state update from the pending balancer.
359	// If the current balancer is currently in a state other than READY, the new
360	// policy can be swapped into place immediately. This is because there is no
361	// reason to gracefully switch from and keep using the old policy as the
362	// ClientConn is not connected to any backends.
363	if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
364		bw.gsb.swap()
365	}
366}
367
368func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
369	bw.gsb.mu.Lock()
370	if !bw.gsb.balancerCurrentOrPending(bw) {
371		bw.gsb.mu.Unlock()
372		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
373	}
374	bw.gsb.mu.Unlock()
375
376	var sc balancer.SubConn
377	oldListener := opts.StateListener
378	opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
379	sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
380	if err != nil {
381		return nil, err
382	}
383	bw.gsb.mu.Lock()
384	if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
385		sc.Shutdown()
386		bw.gsb.mu.Unlock()
387		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
388	}
389	bw.subconns[sc] = true
390	bw.gsb.mu.Unlock()
391	return sc, nil
392}
393
394func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
395	// Ignore ResolveNow requests from anything other than the most recent
396	// balancer, because older balancers were already removed from the config.
397	if bw != bw.gsb.latestBalancer() {
398		return
399	}
400	bw.gsb.cc.ResolveNow(opts)
401}
402
403func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
404	// Note: existing third party balancers may call this, so it must remain
405	// until RemoveSubConn is fully removed.
406	sc.Shutdown()
407}
408
409func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
410	bw.gsb.mu.Lock()
411	if !bw.gsb.balancerCurrentOrPending(bw) {
412		bw.gsb.mu.Unlock()
413		return
414	}
415	bw.gsb.mu.Unlock()
416	bw.gsb.cc.UpdateAddresses(sc, addrs)
417}