endpointsharding.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19// Package endpointsharding implements a load balancing policy that manages
 20// homogeneous child policies each owning a single endpoint.
 21//
 22// # Experimental
 23//
 24// Notice: This package is EXPERIMENTAL and may be changed or removed in a
 25// later release.
 26package endpointsharding
 27
 28import (
 29	"errors"
 30	rand "math/rand/v2"
 31	"sync"
 32	"sync/atomic"
 33
 34	"google.golang.org/grpc/balancer"
 35	"google.golang.org/grpc/balancer/base"
 36	"google.golang.org/grpc/connectivity"
 37	"google.golang.org/grpc/resolver"
 38)
 39
 40// ChildState is the balancer state of a child along with the endpoint which
 41// identifies the child balancer.
 42type ChildState struct {
 43	Endpoint resolver.Endpoint
 44	State    balancer.State
 45
 46	// Balancer exposes only the ExitIdler interface of the child LB policy.
 47	// Other methods of the child policy are called only by endpointsharding.
 48	Balancer balancer.ExitIdler
 49}
 50
 51// Options are the options to configure the behaviour of the
 52// endpointsharding balancer.
 53type Options struct {
 54	// DisableAutoReconnect allows the balancer to keep child balancer in the
 55	// IDLE state until they are explicitly triggered to exit using the
 56	// ChildState obtained from the endpointsharding picker. When set to false,
 57	// the endpointsharding balancer will automatically call ExitIdle on child
 58	// connections that report IDLE.
 59	DisableAutoReconnect bool
 60}
 61
 62// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
 63// type as the balancer.Builder.Build method.
 64type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
 65
 66// NewBalancer returns a load balancing policy that manages homogeneous child
 67// policies each owning a single endpoint. The endpointsharding balancer
 68// forwards the LoadBalancingConfig in ClientConn state updates to its children.
 69func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
 70	es := &endpointSharding{
 71		cc:           cc,
 72		bOpts:        opts,
 73		esOpts:       esOpts,
 74		childBuilder: childBuilder,
 75	}
 76	es.children.Store(resolver.NewEndpointMap())
 77	return es
 78}
 79
 80// endpointSharding is a balancer that wraps child balancers. It creates a child
 81// balancer with child config for every unique Endpoint received. It updates the
 82// child states on any update from parent or child.
 83type endpointSharding struct {
 84	cc           balancer.ClientConn
 85	bOpts        balancer.BuildOptions
 86	esOpts       Options
 87	childBuilder ChildBuilderFunc
 88
 89	// childMu synchronizes calls to any single child. It must be held for all
 90	// calls into a child. To avoid deadlocks, do not acquire childMu while
 91	// holding mu.
 92	childMu  sync.Mutex
 93	children atomic.Pointer[resolver.EndpointMap] // endpoint -> *balancerWrapper
 94
 95	// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
 96	// calls (calls to children will each produce an update, only want one
 97	// update).
 98	inhibitChildUpdates atomic.Bool
 99
100	// mu synchronizes access to the state stored in balancerWrappers in the
101	// children field. mu must not be held during calls into a child since
102	// synchronous calls back from the child may require taking mu, causing a
103	// deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
104	mu sync.Mutex
105}
106
107// UpdateClientConnState creates a child for new endpoints and deletes children
108// for endpoints that are no longer present. It also updates all the children,
109// and sends a single synchronous update of the childrens' aggregated state at
110// the end of the UpdateClientConnState operation. If any endpoint has no
111// addresses it will ignore that endpoint. Otherwise, returns first error found
112// from a child, but fully processes the new update.
113func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
114	es.childMu.Lock()
115	defer es.childMu.Unlock()
116
117	es.inhibitChildUpdates.Store(true)
118	defer func() {
119		es.inhibitChildUpdates.Store(false)
120		es.updateState()
121	}()
122	var ret error
123
124	children := es.children.Load()
125	newChildren := resolver.NewEndpointMap()
126
127	// Update/Create new children.
128	for _, endpoint := range state.ResolverState.Endpoints {
129		if _, ok := newChildren.Get(endpoint); ok {
130			// Endpoint child was already created, continue to avoid duplicate
131			// update.
132			continue
133		}
134		var childBalancer *balancerWrapper
135		if val, ok := children.Get(endpoint); ok {
136			childBalancer = val.(*balancerWrapper)
137			// Endpoint attributes may have changed, update the stored endpoint.
138			es.mu.Lock()
139			childBalancer.childState.Endpoint = endpoint
140			es.mu.Unlock()
141		} else {
142			childBalancer = &balancerWrapper{
143				childState: ChildState{Endpoint: endpoint},
144				ClientConn: es.cc,
145				es:         es,
146			}
147			childBalancer.childState.Balancer = childBalancer
148			childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
149		}
150		newChildren.Set(endpoint, childBalancer)
151		if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
152			BalancerConfig: state.BalancerConfig,
153			ResolverState: resolver.State{
154				Endpoints:  []resolver.Endpoint{endpoint},
155				Attributes: state.ResolverState.Attributes,
156			},
157		}); err != nil && ret == nil {
158			// Return first error found, and always commit full processing of
159			// updating children. If desired to process more specific errors
160			// across all endpoints, caller should make these specific
161			// validations, this is a current limitation for simplicity sake.
162			ret = err
163		}
164	}
165	// Delete old children that are no longer present.
166	for _, e := range children.Keys() {
167		child, _ := children.Get(e)
168		if _, ok := newChildren.Get(e); !ok {
169			child.(*balancerWrapper).closeLocked()
170		}
171	}
172	es.children.Store(newChildren)
173	if newChildren.Len() == 0 {
174		return balancer.ErrBadResolverState
175	}
176	return ret
177}
178
179// ResolverError forwards the resolver error to all of the endpointSharding's
180// children and sends a single synchronous update of the childStates at the end
181// of the ResolverError operation.
182func (es *endpointSharding) ResolverError(err error) {
183	es.childMu.Lock()
184	defer es.childMu.Unlock()
185	es.inhibitChildUpdates.Store(true)
186	defer func() {
187		es.inhibitChildUpdates.Store(false)
188		es.updateState()
189	}()
190	children := es.children.Load()
191	for _, child := range children.Values() {
192		child.(*balancerWrapper).resolverErrorLocked(err)
193	}
194}
195
196func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
197	// UpdateSubConnState is deprecated.
198}
199
200func (es *endpointSharding) Close() {
201	es.childMu.Lock()
202	defer es.childMu.Unlock()
203	children := es.children.Load()
204	for _, child := range children.Values() {
205		child.(*balancerWrapper).closeLocked()
206	}
207}
208
209// updateState updates this component's state. It sends the aggregated state,
210// and a picker with round robin behavior with all the child states present if
211// needed.
212func (es *endpointSharding) updateState() {
213	if es.inhibitChildUpdates.Load() {
214		return
215	}
216	var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
217
218	es.mu.Lock()
219	defer es.mu.Unlock()
220
221	children := es.children.Load()
222	childStates := make([]ChildState, 0, children.Len())
223
224	for _, child := range children.Values() {
225		bw := child.(*balancerWrapper)
226		childState := bw.childState
227		childStates = append(childStates, childState)
228		childPicker := childState.State.Picker
229		switch childState.State.ConnectivityState {
230		case connectivity.Ready:
231			readyPickers = append(readyPickers, childPicker)
232		case connectivity.Connecting:
233			connectingPickers = append(connectingPickers, childPicker)
234		case connectivity.Idle:
235			idlePickers = append(idlePickers, childPicker)
236		case connectivity.TransientFailure:
237			transientFailurePickers = append(transientFailurePickers, childPicker)
238			// connectivity.Shutdown shouldn't appear.
239		}
240	}
241
242	// Construct the round robin picker based off the aggregated state. Whatever
243	// the aggregated state, use the pickers present that are currently in that
244	// state only.
245	var aggState connectivity.State
246	var pickers []balancer.Picker
247	if len(readyPickers) >= 1 {
248		aggState = connectivity.Ready
249		pickers = readyPickers
250	} else if len(connectingPickers) >= 1 {
251		aggState = connectivity.Connecting
252		pickers = connectingPickers
253	} else if len(idlePickers) >= 1 {
254		aggState = connectivity.Idle
255		pickers = idlePickers
256	} else if len(transientFailurePickers) >= 1 {
257		aggState = connectivity.TransientFailure
258		pickers = transientFailurePickers
259	} else {
260		aggState = connectivity.TransientFailure
261		pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
262	} // No children (resolver error before valid update).
263	p := &pickerWithChildStates{
264		pickers:     pickers,
265		childStates: childStates,
266		next:        uint32(rand.IntN(len(pickers))),
267	}
268	es.cc.UpdateState(balancer.State{
269		ConnectivityState: aggState,
270		Picker:            p,
271	})
272}
273
274// pickerWithChildStates delegates to the pickers it holds in a round robin
275// fashion. It also contains the childStates of all the endpointSharding's
276// children.
277type pickerWithChildStates struct {
278	pickers     []balancer.Picker
279	childStates []ChildState
280	next        uint32
281}
282
283func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
284	nextIndex := atomic.AddUint32(&p.next, 1)
285	picker := p.pickers[nextIndex%uint32(len(p.pickers))]
286	return picker.Pick(info)
287}
288
289// ChildStatesFromPicker returns the state of all the children managed by the
290// endpoint sharding balancer that created this picker.
291func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
292	p, ok := picker.(*pickerWithChildStates)
293	if !ok {
294		return nil
295	}
296	return p.childStates
297}
298
299// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
300// endpoint, and persists recent child balancer state.
301type balancerWrapper struct {
302	// The following fields are initialized at build time and read-only after
303	// that and therefore do not need to be guarded by a mutex.
304
305	// child contains the wrapped balancer. Access its methods only through
306	// methods on balancerWrapper to ensure proper synchronization
307	child               balancer.Balancer
308	balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
309
310	es *endpointSharding
311
312	// Access to the following fields is guarded by es.mu.
313
314	childState ChildState
315	isClosed   bool
316}
317
318func (bw *balancerWrapper) UpdateState(state balancer.State) {
319	bw.es.mu.Lock()
320	bw.childState.State = state
321	bw.es.mu.Unlock()
322	if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
323		bw.ExitIdle()
324	}
325	bw.es.updateState()
326}
327
328// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
329// avoid deadlocks due to synchronous balancer state updates.
330func (bw *balancerWrapper) ExitIdle() {
331	if ei, ok := bw.child.(balancer.ExitIdler); ok {
332		go func() {
333			bw.es.childMu.Lock()
334			if !bw.isClosed {
335				ei.ExitIdle()
336			}
337			bw.es.childMu.Unlock()
338		}()
339	}
340}
341
342// updateClientConnStateLocked delivers the ClientConnState to the child
343// balancer. Callers must hold the child mutex of the parent endpointsharding
344// balancer.
345func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
346	return bw.child.UpdateClientConnState(ccs)
347}
348
349// closeLocked closes the child balancer. Callers must hold the child mutext of
350// the parent endpointsharding balancer.
351func (bw *balancerWrapper) closeLocked() {
352	bw.child.Close()
353	bw.isClosed = true
354}
355
356func (bw *balancerWrapper) resolverErrorLocked(err error) {
357	bw.child.ResolverError(err)
358}