picker_wrapper.go

  1/*
  2 *
  3 * Copyright 2017 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package grpc
 20
 21import (
 22	"context"
 23	"fmt"
 24	"io"
 25	"sync/atomic"
 26
 27	"google.golang.org/grpc/balancer"
 28	"google.golang.org/grpc/codes"
 29	"google.golang.org/grpc/internal/channelz"
 30	istatus "google.golang.org/grpc/internal/status"
 31	"google.golang.org/grpc/internal/transport"
 32	"google.golang.org/grpc/stats"
 33	"google.golang.org/grpc/status"
 34)
 35
 36// pickerGeneration stores a picker and a channel used to signal that a picker
 37// newer than this one is available.
 38type pickerGeneration struct {
 39	// picker is the picker produced by the LB policy.  May be nil if a picker
 40	// has never been produced.
 41	picker balancer.Picker
 42	// blockingCh is closed when the picker has been invalidated because there
 43	// is a new one available.
 44	blockingCh chan struct{}
 45}
 46
 47// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
 48// actions and unblock when there's a picker update.
 49type pickerWrapper struct {
 50	// If pickerGen holds a nil pointer, the pickerWrapper is closed.
 51	pickerGen     atomic.Pointer[pickerGeneration]
 52	statsHandlers []stats.Handler // to record blocking picker calls
 53}
 54
 55func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
 56	pw := &pickerWrapper{
 57		statsHandlers: statsHandlers,
 58	}
 59	pw.pickerGen.Store(&pickerGeneration{
 60		blockingCh: make(chan struct{}),
 61	})
 62	return pw
 63}
 64
 65// updatePicker is called by UpdateState calls from the LB policy. It
 66// unblocks all blocked pick.
 67func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
 68	old := pw.pickerGen.Swap(&pickerGeneration{
 69		picker:     p,
 70		blockingCh: make(chan struct{}),
 71	})
 72	close(old.blockingCh)
 73}
 74
 75// doneChannelzWrapper performs the following:
 76//   - increments the calls started channelz counter
 77//   - wraps the done function in the passed in result to increment the calls
 78//     failed or calls succeeded channelz counter before invoking the actual
 79//     done function.
 80func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
 81	ac := acbw.ac
 82	ac.incrCallsStarted()
 83	done := result.Done
 84	result.Done = func(b balancer.DoneInfo) {
 85		if b.Err != nil && b.Err != io.EOF {
 86			ac.incrCallsFailed()
 87		} else {
 88			ac.incrCallsSucceeded()
 89		}
 90		if done != nil {
 91			done(b)
 92		}
 93	}
 94}
 95
 96// pick returns the transport that will be used for the RPC.
 97// It may block in the following cases:
 98// - there's no picker
 99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
103func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
104	var ch chan struct{}
105
106	var lastPickErr error
107
108	for {
109		pg := pw.pickerGen.Load()
110		if pg == nil {
111			return nil, balancer.PickResult{}, ErrClientConnClosing
112		}
113		if pg.picker == nil {
114			ch = pg.blockingCh
115		}
116		if ch == pg.blockingCh {
117			// This could happen when either:
118			// - pw.picker is nil (the previous if condition), or
119			// - we have already called pick on the current picker.
120			select {
121			case <-ctx.Done():
122				var errStr string
123				if lastPickErr != nil {
124					errStr = "latest balancer error: " + lastPickErr.Error()
125				} else {
126					errStr = fmt.Sprintf("%v while waiting for connections to become ready", ctx.Err())
127				}
128				switch ctx.Err() {
129				case context.DeadlineExceeded:
130					return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
131				case context.Canceled:
132					return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
133				}
134			case <-ch:
135			}
136			continue
137		}
138
139		// If the channel is set, it means that the pick call had to wait for a
140		// new picker at some point. Either it's the first iteration and this
141		// function received the first picker, or a picker errored with
142		// ErrNoSubConnAvailable or errored with failfast set to false, which
143		// will trigger a continue to the next iteration. In the first case this
144		// conditional will hit if this call had to block (the channel is set).
145		// In the second case, the only way it will get to this conditional is
146		// if there is a new picker.
147		if ch != nil {
148			for _, sh := range pw.statsHandlers {
149				sh.HandleRPC(ctx, &stats.PickerUpdated{})
150			}
151		}
152
153		ch = pg.blockingCh
154		p := pg.picker
155
156		pickResult, err := p.Pick(info)
157		if err != nil {
158			if err == balancer.ErrNoSubConnAvailable {
159				continue
160			}
161			if st, ok := status.FromError(err); ok {
162				// Status error: end the RPC unconditionally with this status.
163				// First restrict the code to the list allowed by gRFC A54.
164				if istatus.IsRestrictedControlPlaneCode(st) {
165					err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
166				}
167				return nil, balancer.PickResult{}, dropError{error: err}
168			}
169			// For all other errors, wait for ready RPCs should block and other
170			// RPCs should fail with unavailable.
171			if !failfast {
172				lastPickErr = err
173				continue
174			}
175			return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
176		}
177
178		acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
179		if !ok {
180			logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
181			continue
182		}
183		if t := acbw.ac.getReadyTransport(); t != nil {
184			if channelz.IsOn() {
185				doneChannelzWrapper(acbw, &pickResult)
186				return t, pickResult, nil
187			}
188			return t, pickResult, nil
189		}
190		if pickResult.Done != nil {
191			// Calling done with nil error, no bytes sent and no bytes received.
192			// DoneInfo with default value works.
193			pickResult.Done(balancer.DoneInfo{})
194		}
195		logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
196		// If ok == false, ac.state is not READY.
197		// A valid picker always returns READY subConn. This means the state of ac
198		// just changed, and picker will be updated shortly.
199		// continue back to the beginning of the for loop to repick.
200	}
201}
202
203func (pw *pickerWrapper) close() {
204	old := pw.pickerGen.Swap(nil)
205	close(old.blockingCh)
206}
207
208// reset clears the pickerWrapper and prepares it for being used again when idle
209// mode is exited.
210func (pw *pickerWrapper) reset() {
211	old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
212	close(old.blockingCh)
213}
214
215// dropError is a wrapper error that indicates the LB policy wishes to drop the
216// RPC and not retry it.
217type dropError struct {
218	error
219}