1/*
2 *
3 * Copyright 2022 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package gracefulswitch implements a graceful switch load balancer.
20package gracefulswitch
21
22import (
23 "errors"
24 "fmt"
25 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/balancer/base"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/resolver"
31)
32
33var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
34var _ balancer.Balancer = (*Balancer)(nil)
35
36// NewBalancer returns a graceful switch Balancer.
37func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
38 return &Balancer{
39 cc: cc,
40 bOpts: opts,
41 }
42}
43
44// Balancer is a utility to gracefully switch from one balancer to
45// a new balancer. It implements the balancer.Balancer interface.
46type Balancer struct {
47 bOpts balancer.BuildOptions
48 cc balancer.ClientConn
49
50 // mu protects the following fields and all fields within balancerCurrent
51 // and balancerPending. mu does not need to be held when calling into the
52 // child balancers, as all calls into these children happen only as a direct
53 // result of a call into the gracefulSwitchBalancer, which are also
54 // guaranteed to be synchronous. There is one exception: an UpdateState call
55 // from a child balancer when current and pending are populated can lead to
56 // calling Close() on the current. To prevent that racing with an
57 // UpdateSubConnState from the channel, we hold currentMu during Close and
58 // UpdateSubConnState calls.
59 mu sync.Mutex
60 balancerCurrent *balancerWrapper
61 balancerPending *balancerWrapper
62 closed bool // set to true when this balancer is closed
63
64 // currentMu must be locked before mu. This mutex guards against this
65 // sequence of events: UpdateSubConnState() called, finds the
66 // balancerCurrent, gives up lock, updateState comes in, causes Close() on
67 // balancerCurrent before the UpdateSubConnState is called on the
68 // balancerCurrent.
69 currentMu sync.Mutex
70}
71
72// swap swaps out the current lb with the pending lb and updates the ClientConn.
73// The caller must hold gsb.mu.
74func (gsb *Balancer) swap() {
75 gsb.cc.UpdateState(gsb.balancerPending.lastState)
76 cur := gsb.balancerCurrent
77 gsb.balancerCurrent = gsb.balancerPending
78 gsb.balancerPending = nil
79 go func() {
80 gsb.currentMu.Lock()
81 defer gsb.currentMu.Unlock()
82 cur.Close()
83 }()
84}
85
86// Helper function that checks if the balancer passed in is current or pending.
87// The caller must hold gsb.mu.
88func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
89 return bw == gsb.balancerCurrent || bw == gsb.balancerPending
90}
91
92// SwitchTo initializes the graceful switch process, which completes based on
93// connectivity state changes on the current/pending balancer. Thus, the switch
94// process is not complete when this method returns. This method must be called
95// synchronously alongside the rest of the balancer.Balancer methods this
96// Graceful Switch Balancer implements.
97//
98// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
99// to cause the Balancer to automatically change to the new child when necessary.
100func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
101 _, err := gsb.switchTo(builder)
102 return err
103}
104
105func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
106 gsb.mu.Lock()
107 if gsb.closed {
108 gsb.mu.Unlock()
109 return nil, errBalancerClosed
110 }
111 bw := &balancerWrapper{
112 ClientConn: gsb.cc,
113 builder: builder,
114 gsb: gsb,
115 lastState: balancer.State{
116 ConnectivityState: connectivity.Connecting,
117 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
118 },
119 subconns: make(map[balancer.SubConn]bool),
120 }
121 balToClose := gsb.balancerPending // nil if there is no pending balancer
122 if gsb.balancerCurrent == nil {
123 gsb.balancerCurrent = bw
124 } else {
125 gsb.balancerPending = bw
126 }
127 gsb.mu.Unlock()
128 balToClose.Close()
129 // This function takes a builder instead of a balancer because builder.Build
130 // can call back inline, and this utility needs to handle the callbacks.
131 newBalancer := builder.Build(bw, gsb.bOpts)
132 if newBalancer == nil {
133 // This is illegal and should never happen; we clear the balancerWrapper
134 // we were constructing if it happens to avoid a potential panic.
135 gsb.mu.Lock()
136 if gsb.balancerPending != nil {
137 gsb.balancerPending = nil
138 } else {
139 gsb.balancerCurrent = nil
140 }
141 gsb.mu.Unlock()
142 return nil, balancer.ErrBadResolverState
143 }
144
145 // This write doesn't need to take gsb.mu because this field never gets read
146 // or written to on any calls from the current or pending. Calls from grpc
147 // to this balancer are guaranteed to be called synchronously, so this
148 // bw.Balancer field will never be forwarded to until this SwitchTo()
149 // function returns.
150 bw.Balancer = newBalancer
151 return bw, nil
152}
153
154// Returns nil if the graceful switch balancer is closed.
155func (gsb *Balancer) latestBalancer() *balancerWrapper {
156 gsb.mu.Lock()
157 defer gsb.mu.Unlock()
158 if gsb.balancerPending != nil {
159 return gsb.balancerPending
160 }
161 return gsb.balancerCurrent
162}
163
164// UpdateClientConnState forwards the update to the latest balancer created.
165//
166// If the state's BalancerConfig is the config returned by a call to
167// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
168// the balancer indicated by the config before forwarding its config to it, if
169// necessary.
170func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
171 // The resolver data is only relevant to the most recent LB Policy.
172 balToUpdate := gsb.latestBalancer()
173 gsbCfg, ok := state.BalancerConfig.(*lbConfig)
174 if ok {
175 // Switch to the child in the config unless it is already active.
176 if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
177 var err error
178 balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
179 if err != nil {
180 return fmt.Errorf("could not switch to new child balancer: %w", err)
181 }
182 }
183 // Unwrap the child balancer's config.
184 state.BalancerConfig = gsbCfg.childConfig
185 }
186
187 if balToUpdate == nil {
188 return errBalancerClosed
189 }
190
191 // Perform this call without gsb.mu to prevent deadlocks if the child calls
192 // back into the channel. The latest balancer can never be closed during a
193 // call from the channel, even without gsb.mu held.
194 return balToUpdate.UpdateClientConnState(state)
195}
196
197// ResolverError forwards the error to the latest balancer created.
198func (gsb *Balancer) ResolverError(err error) {
199 // The resolver data is only relevant to the most recent LB Policy.
200 balToUpdate := gsb.latestBalancer()
201 if balToUpdate == nil {
202 gsb.cc.UpdateState(balancer.State{
203 ConnectivityState: connectivity.TransientFailure,
204 Picker: base.NewErrPicker(err),
205 })
206 return
207 }
208 // Perform this call without gsb.mu to prevent deadlocks if the child calls
209 // back into the channel. The latest balancer can never be closed during a
210 // call from the channel, even without gsb.mu held.
211 balToUpdate.ResolverError(err)
212}
213
214// ExitIdle forwards the call to the latest balancer created.
215//
216// If the latest balancer does not support ExitIdle, the subConns are
217// re-connected to manually.
218func (gsb *Balancer) ExitIdle() {
219 balToUpdate := gsb.latestBalancer()
220 if balToUpdate == nil {
221 return
222 }
223 // There is no need to protect this read with a mutex, as the write to the
224 // Balancer field happens in SwitchTo, which completes before this can be
225 // called.
226 if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
227 ei.ExitIdle()
228 return
229 }
230 gsb.mu.Lock()
231 defer gsb.mu.Unlock()
232 for sc := range balToUpdate.subconns {
233 sc.Connect()
234 }
235}
236
237// updateSubConnState forwards the update to the appropriate child.
238func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
239 gsb.currentMu.Lock()
240 defer gsb.currentMu.Unlock()
241 gsb.mu.Lock()
242 // Forward update to the appropriate child. Even if there is a pending
243 // balancer, the current balancer should continue to get SubConn updates to
244 // maintain the proper state while the pending is still connecting.
245 var balToUpdate *balancerWrapper
246 if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
247 balToUpdate = gsb.balancerCurrent
248 } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
249 balToUpdate = gsb.balancerPending
250 }
251 if balToUpdate == nil {
252 // SubConn belonged to a stale lb policy that has not yet fully closed,
253 // or the balancer was already closed.
254 gsb.mu.Unlock()
255 return
256 }
257 if state.ConnectivityState == connectivity.Shutdown {
258 delete(balToUpdate.subconns, sc)
259 }
260 gsb.mu.Unlock()
261 if cb != nil {
262 cb(state)
263 } else {
264 balToUpdate.UpdateSubConnState(sc, state)
265 }
266}
267
268// UpdateSubConnState forwards the update to the appropriate child.
269func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
270 gsb.updateSubConnState(sc, state, nil)
271}
272
273// Close closes any active child balancers.
274func (gsb *Balancer) Close() {
275 gsb.mu.Lock()
276 gsb.closed = true
277 currentBalancerToClose := gsb.balancerCurrent
278 gsb.balancerCurrent = nil
279 pendingBalancerToClose := gsb.balancerPending
280 gsb.balancerPending = nil
281 gsb.mu.Unlock()
282
283 currentBalancerToClose.Close()
284 pendingBalancerToClose.Close()
285}
286
287// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
288// methods to help cleanup SubConns created by the wrapped balancer.
289//
290// It implements the balancer.ClientConn interface and is passed down in that
291// capacity to the wrapped balancer. It maintains a set of subConns created by
292// the wrapped balancer and calls from the latter to create/update/shutdown
293// SubConns update this set before being forwarded to the parent ClientConn.
294// State updates from the wrapped balancer can result in invocation of the
295// graceful switch logic.
296type balancerWrapper struct {
297 balancer.ClientConn
298 balancer.Balancer
299 gsb *Balancer
300 builder balancer.Builder
301
302 lastState balancer.State
303 subconns map[balancer.SubConn]bool // subconns created by this balancer
304}
305
306// Close closes the underlying LB policy and shuts down the subconns it
307// created. bw must not be referenced via balancerCurrent or balancerPending in
308// gsb when called. gsb.mu must not be held. Does not panic with a nil
309// receiver.
310func (bw *balancerWrapper) Close() {
311 // before Close is called.
312 if bw == nil {
313 return
314 }
315 // There is no need to protect this read with a mutex, as Close() is
316 // impossible to be called concurrently with the write in SwitchTo(). The
317 // callsites of Close() for this balancer in Graceful Switch Balancer will
318 // never be called until SwitchTo() returns.
319 bw.Balancer.Close()
320 bw.gsb.mu.Lock()
321 for sc := range bw.subconns {
322 sc.Shutdown()
323 }
324 bw.gsb.mu.Unlock()
325}
326
327func (bw *balancerWrapper) UpdateState(state balancer.State) {
328 // Hold the mutex for this entire call to ensure it cannot occur
329 // concurrently with other updateState() calls. This causes updates to
330 // lastState and calls to cc.UpdateState to happen atomically.
331 bw.gsb.mu.Lock()
332 defer bw.gsb.mu.Unlock()
333 bw.lastState = state
334
335 if !bw.gsb.balancerCurrentOrPending(bw) {
336 return
337 }
338
339 if bw == bw.gsb.balancerCurrent {
340 // In the case that the current balancer exits READY, and there is a pending
341 // balancer, you can forward the pending balancer's cached State up to
342 // ClientConn and swap the pending into the current. This is because there
343 // is no reason to gracefully switch from and keep using the old policy as
344 // the ClientConn is not connected to any backends.
345 if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
346 bw.gsb.swap()
347 return
348 }
349 // Even if there is a pending balancer waiting to be gracefully switched to,
350 // continue to forward current balancer updates to the Client Conn. Ignoring
351 // state + picker from the current would cause undefined behavior/cause the
352 // system to behave incorrectly from the current LB policies perspective.
353 // Also, the current LB is still being used by grpc to choose SubConns per
354 // RPC, and thus should use the most updated form of the current balancer.
355 bw.gsb.cc.UpdateState(state)
356 return
357 }
358 // This method is now dealing with a state update from the pending balancer.
359 // If the current balancer is currently in a state other than READY, the new
360 // policy can be swapped into place immediately. This is because there is no
361 // reason to gracefully switch from and keep using the old policy as the
362 // ClientConn is not connected to any backends.
363 if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
364 bw.gsb.swap()
365 }
366}
367
368func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
369 bw.gsb.mu.Lock()
370 if !bw.gsb.balancerCurrentOrPending(bw) {
371 bw.gsb.mu.Unlock()
372 return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
373 }
374 bw.gsb.mu.Unlock()
375
376 var sc balancer.SubConn
377 oldListener := opts.StateListener
378 opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
379 sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
380 if err != nil {
381 return nil, err
382 }
383 bw.gsb.mu.Lock()
384 if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
385 sc.Shutdown()
386 bw.gsb.mu.Unlock()
387 return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
388 }
389 bw.subconns[sc] = true
390 bw.gsb.mu.Unlock()
391 return sc, nil
392}
393
394func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
395 // Ignore ResolveNow requests from anything other than the most recent
396 // balancer, because older balancers were already removed from the config.
397 if bw != bw.gsb.latestBalancer() {
398 return
399 }
400 bw.gsb.cc.ResolveNow(opts)
401}
402
403func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
404 // Note: existing third party balancers may call this, so it must remain
405 // until RemoveSubConn is fully removed.
406 sc.Shutdown()
407}
408
409func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
410 bw.gsb.mu.Lock()
411 if !bw.gsb.balancerCurrentOrPending(bw) {
412 bw.gsb.mu.Unlock()
413 return
414 }
415 bw.gsb.mu.Unlock()
416 bw.gsb.cc.UpdateAddresses(sc, addrs)
417}