1/*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package endpointsharding implements a load balancing policy that manages
20// homogeneous child policies each owning a single endpoint.
21//
22// # Experimental
23//
24// Notice: This package is EXPERIMENTAL and may be changed or removed in a
25// later release.
26package endpointsharding
27
28import (
29 "errors"
30 rand "math/rand/v2"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/balancer/base"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/resolver"
38)
39
40// ChildState is the balancer state of a child along with the endpoint which
41// identifies the child balancer.
42type ChildState struct {
43 Endpoint resolver.Endpoint
44 State balancer.State
45
46 // Balancer exposes only the ExitIdler interface of the child LB policy.
47 // Other methods of the child policy are called only by endpointsharding.
48 Balancer balancer.ExitIdler
49}
50
51// Options are the options to configure the behaviour of the
52// endpointsharding balancer.
53type Options struct {
54 // DisableAutoReconnect allows the balancer to keep child balancer in the
55 // IDLE state until they are explicitly triggered to exit using the
56 // ChildState obtained from the endpointsharding picker. When set to false,
57 // the endpointsharding balancer will automatically call ExitIdle on child
58 // connections that report IDLE.
59 DisableAutoReconnect bool
60}
61
62// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
63// type as the balancer.Builder.Build method.
64type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
65
66// NewBalancer returns a load balancing policy that manages homogeneous child
67// policies each owning a single endpoint. The endpointsharding balancer
68// forwards the LoadBalancingConfig in ClientConn state updates to its children.
69func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
70 es := &endpointSharding{
71 cc: cc,
72 bOpts: opts,
73 esOpts: esOpts,
74 childBuilder: childBuilder,
75 }
76 es.children.Store(resolver.NewEndpointMap())
77 return es
78}
79
80// endpointSharding is a balancer that wraps child balancers. It creates a child
81// balancer with child config for every unique Endpoint received. It updates the
82// child states on any update from parent or child.
83type endpointSharding struct {
84 cc balancer.ClientConn
85 bOpts balancer.BuildOptions
86 esOpts Options
87 childBuilder ChildBuilderFunc
88
89 // childMu synchronizes calls to any single child. It must be held for all
90 // calls into a child. To avoid deadlocks, do not acquire childMu while
91 // holding mu.
92 childMu sync.Mutex
93 children atomic.Pointer[resolver.EndpointMap] // endpoint -> *balancerWrapper
94
95 // inhibitChildUpdates is set during UpdateClientConnState/ResolverError
96 // calls (calls to children will each produce an update, only want one
97 // update).
98 inhibitChildUpdates atomic.Bool
99
100 // mu synchronizes access to the state stored in balancerWrappers in the
101 // children field. mu must not be held during calls into a child since
102 // synchronous calls back from the child may require taking mu, causing a
103 // deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
104 mu sync.Mutex
105}
106
107// UpdateClientConnState creates a child for new endpoints and deletes children
108// for endpoints that are no longer present. It also updates all the children,
109// and sends a single synchronous update of the childrens' aggregated state at
110// the end of the UpdateClientConnState operation. If any endpoint has no
111// addresses it will ignore that endpoint. Otherwise, returns first error found
112// from a child, but fully processes the new update.
113func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
114 es.childMu.Lock()
115 defer es.childMu.Unlock()
116
117 es.inhibitChildUpdates.Store(true)
118 defer func() {
119 es.inhibitChildUpdates.Store(false)
120 es.updateState()
121 }()
122 var ret error
123
124 children := es.children.Load()
125 newChildren := resolver.NewEndpointMap()
126
127 // Update/Create new children.
128 for _, endpoint := range state.ResolverState.Endpoints {
129 if _, ok := newChildren.Get(endpoint); ok {
130 // Endpoint child was already created, continue to avoid duplicate
131 // update.
132 continue
133 }
134 var childBalancer *balancerWrapper
135 if val, ok := children.Get(endpoint); ok {
136 childBalancer = val.(*balancerWrapper)
137 // Endpoint attributes may have changed, update the stored endpoint.
138 es.mu.Lock()
139 childBalancer.childState.Endpoint = endpoint
140 es.mu.Unlock()
141 } else {
142 childBalancer = &balancerWrapper{
143 childState: ChildState{Endpoint: endpoint},
144 ClientConn: es.cc,
145 es: es,
146 }
147 childBalancer.childState.Balancer = childBalancer
148 childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
149 }
150 newChildren.Set(endpoint, childBalancer)
151 if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
152 BalancerConfig: state.BalancerConfig,
153 ResolverState: resolver.State{
154 Endpoints: []resolver.Endpoint{endpoint},
155 Attributes: state.ResolverState.Attributes,
156 },
157 }); err != nil && ret == nil {
158 // Return first error found, and always commit full processing of
159 // updating children. If desired to process more specific errors
160 // across all endpoints, caller should make these specific
161 // validations, this is a current limitation for simplicity sake.
162 ret = err
163 }
164 }
165 // Delete old children that are no longer present.
166 for _, e := range children.Keys() {
167 child, _ := children.Get(e)
168 if _, ok := newChildren.Get(e); !ok {
169 child.(*balancerWrapper).closeLocked()
170 }
171 }
172 es.children.Store(newChildren)
173 if newChildren.Len() == 0 {
174 return balancer.ErrBadResolverState
175 }
176 return ret
177}
178
179// ResolverError forwards the resolver error to all of the endpointSharding's
180// children and sends a single synchronous update of the childStates at the end
181// of the ResolverError operation.
182func (es *endpointSharding) ResolverError(err error) {
183 es.childMu.Lock()
184 defer es.childMu.Unlock()
185 es.inhibitChildUpdates.Store(true)
186 defer func() {
187 es.inhibitChildUpdates.Store(false)
188 es.updateState()
189 }()
190 children := es.children.Load()
191 for _, child := range children.Values() {
192 child.(*balancerWrapper).resolverErrorLocked(err)
193 }
194}
195
196func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
197 // UpdateSubConnState is deprecated.
198}
199
200func (es *endpointSharding) Close() {
201 es.childMu.Lock()
202 defer es.childMu.Unlock()
203 children := es.children.Load()
204 for _, child := range children.Values() {
205 child.(*balancerWrapper).closeLocked()
206 }
207}
208
209// updateState updates this component's state. It sends the aggregated state,
210// and a picker with round robin behavior with all the child states present if
211// needed.
212func (es *endpointSharding) updateState() {
213 if es.inhibitChildUpdates.Load() {
214 return
215 }
216 var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
217
218 es.mu.Lock()
219 defer es.mu.Unlock()
220
221 children := es.children.Load()
222 childStates := make([]ChildState, 0, children.Len())
223
224 for _, child := range children.Values() {
225 bw := child.(*balancerWrapper)
226 childState := bw.childState
227 childStates = append(childStates, childState)
228 childPicker := childState.State.Picker
229 switch childState.State.ConnectivityState {
230 case connectivity.Ready:
231 readyPickers = append(readyPickers, childPicker)
232 case connectivity.Connecting:
233 connectingPickers = append(connectingPickers, childPicker)
234 case connectivity.Idle:
235 idlePickers = append(idlePickers, childPicker)
236 case connectivity.TransientFailure:
237 transientFailurePickers = append(transientFailurePickers, childPicker)
238 // connectivity.Shutdown shouldn't appear.
239 }
240 }
241
242 // Construct the round robin picker based off the aggregated state. Whatever
243 // the aggregated state, use the pickers present that are currently in that
244 // state only.
245 var aggState connectivity.State
246 var pickers []balancer.Picker
247 if len(readyPickers) >= 1 {
248 aggState = connectivity.Ready
249 pickers = readyPickers
250 } else if len(connectingPickers) >= 1 {
251 aggState = connectivity.Connecting
252 pickers = connectingPickers
253 } else if len(idlePickers) >= 1 {
254 aggState = connectivity.Idle
255 pickers = idlePickers
256 } else if len(transientFailurePickers) >= 1 {
257 aggState = connectivity.TransientFailure
258 pickers = transientFailurePickers
259 } else {
260 aggState = connectivity.TransientFailure
261 pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
262 } // No children (resolver error before valid update).
263 p := &pickerWithChildStates{
264 pickers: pickers,
265 childStates: childStates,
266 next: uint32(rand.IntN(len(pickers))),
267 }
268 es.cc.UpdateState(balancer.State{
269 ConnectivityState: aggState,
270 Picker: p,
271 })
272}
273
274// pickerWithChildStates delegates to the pickers it holds in a round robin
275// fashion. It also contains the childStates of all the endpointSharding's
276// children.
277type pickerWithChildStates struct {
278 pickers []balancer.Picker
279 childStates []ChildState
280 next uint32
281}
282
283func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
284 nextIndex := atomic.AddUint32(&p.next, 1)
285 picker := p.pickers[nextIndex%uint32(len(p.pickers))]
286 return picker.Pick(info)
287}
288
289// ChildStatesFromPicker returns the state of all the children managed by the
290// endpoint sharding balancer that created this picker.
291func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
292 p, ok := picker.(*pickerWithChildStates)
293 if !ok {
294 return nil
295 }
296 return p.childStates
297}
298
299// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
300// endpoint, and persists recent child balancer state.
301type balancerWrapper struct {
302 // The following fields are initialized at build time and read-only after
303 // that and therefore do not need to be guarded by a mutex.
304
305 // child contains the wrapped balancer. Access its methods only through
306 // methods on balancerWrapper to ensure proper synchronization
307 child balancer.Balancer
308 balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
309
310 es *endpointSharding
311
312 // Access to the following fields is guarded by es.mu.
313
314 childState ChildState
315 isClosed bool
316}
317
318func (bw *balancerWrapper) UpdateState(state balancer.State) {
319 bw.es.mu.Lock()
320 bw.childState.State = state
321 bw.es.mu.Unlock()
322 if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
323 bw.ExitIdle()
324 }
325 bw.es.updateState()
326}
327
328// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
329// avoid deadlocks due to synchronous balancer state updates.
330func (bw *balancerWrapper) ExitIdle() {
331 if ei, ok := bw.child.(balancer.ExitIdler); ok {
332 go func() {
333 bw.es.childMu.Lock()
334 if !bw.isClosed {
335 ei.ExitIdle()
336 }
337 bw.es.childMu.Unlock()
338 }()
339 }
340}
341
342// updateClientConnStateLocked delivers the ClientConnState to the child
343// balancer. Callers must hold the child mutex of the parent endpointsharding
344// balancer.
345func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
346 return bw.child.UpdateClientConnState(ccs)
347}
348
349// closeLocked closes the child balancer. Callers must hold the child mutext of
350// the parent endpointsharding balancer.
351func (bw *balancerWrapper) closeLocked() {
352 bw.child.Close()
353 bw.isClosed = true
354}
355
356func (bw *balancerWrapper) resolverErrorLocked(err error) {
357 bw.child.ResolverError(err)
358}