1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "sync/atomic"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/internal/channelz"
30 istatus "google.golang.org/grpc/internal/status"
31 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/stats"
33 "google.golang.org/grpc/status"
34)
35
36// pickerGeneration stores a picker and a channel used to signal that a picker
37// newer than this one is available.
38type pickerGeneration struct {
39 // picker is the picker produced by the LB policy. May be nil if a picker
40 // has never been produced.
41 picker balancer.Picker
42 // blockingCh is closed when the picker has been invalidated because there
43 // is a new one available.
44 blockingCh chan struct{}
45}
46
47// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
48// actions and unblock when there's a picker update.
49type pickerWrapper struct {
50 // If pickerGen holds a nil pointer, the pickerWrapper is closed.
51 pickerGen atomic.Pointer[pickerGeneration]
52 statsHandlers []stats.Handler // to record blocking picker calls
53}
54
55func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
56 pw := &pickerWrapper{
57 statsHandlers: statsHandlers,
58 }
59 pw.pickerGen.Store(&pickerGeneration{
60 blockingCh: make(chan struct{}),
61 })
62 return pw
63}
64
65// updatePicker is called by UpdateState calls from the LB policy. It
66// unblocks all blocked pick.
67func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
68 old := pw.pickerGen.Swap(&pickerGeneration{
69 picker: p,
70 blockingCh: make(chan struct{}),
71 })
72 close(old.blockingCh)
73}
74
75// doneChannelzWrapper performs the following:
76// - increments the calls started channelz counter
77// - wraps the done function in the passed in result to increment the calls
78// failed or calls succeeded channelz counter before invoking the actual
79// done function.
80func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
81 ac := acbw.ac
82 ac.incrCallsStarted()
83 done := result.Done
84 result.Done = func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
86 ac.incrCallsFailed()
87 } else {
88 ac.incrCallsSucceeded()
89 }
90 if done != nil {
91 done(b)
92 }
93 }
94}
95
96// pick returns the transport that will be used for the RPC.
97// It may block in the following cases:
98// - there's no picker
99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
103func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
104 var ch chan struct{}
105
106 var lastPickErr error
107
108 for {
109 pg := pw.pickerGen.Load()
110 if pg == nil {
111 return nil, balancer.PickResult{}, ErrClientConnClosing
112 }
113 if pg.picker == nil {
114 ch = pg.blockingCh
115 }
116 if ch == pg.blockingCh {
117 // This could happen when either:
118 // - pw.picker is nil (the previous if condition), or
119 // - we have already called pick on the current picker.
120 select {
121 case <-ctx.Done():
122 var errStr string
123 if lastPickErr != nil {
124 errStr = "latest balancer error: " + lastPickErr.Error()
125 } else {
126 errStr = fmt.Sprintf("%v while waiting for connections to become ready", ctx.Err())
127 }
128 switch ctx.Err() {
129 case context.DeadlineExceeded:
130 return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
131 case context.Canceled:
132 return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
133 }
134 case <-ch:
135 }
136 continue
137 }
138
139 // If the channel is set, it means that the pick call had to wait for a
140 // new picker at some point. Either it's the first iteration and this
141 // function received the first picker, or a picker errored with
142 // ErrNoSubConnAvailable or errored with failfast set to false, which
143 // will trigger a continue to the next iteration. In the first case this
144 // conditional will hit if this call had to block (the channel is set).
145 // In the second case, the only way it will get to this conditional is
146 // if there is a new picker.
147 if ch != nil {
148 for _, sh := range pw.statsHandlers {
149 sh.HandleRPC(ctx, &stats.PickerUpdated{})
150 }
151 }
152
153 ch = pg.blockingCh
154 p := pg.picker
155
156 pickResult, err := p.Pick(info)
157 if err != nil {
158 if err == balancer.ErrNoSubConnAvailable {
159 continue
160 }
161 if st, ok := status.FromError(err); ok {
162 // Status error: end the RPC unconditionally with this status.
163 // First restrict the code to the list allowed by gRFC A54.
164 if istatus.IsRestrictedControlPlaneCode(st) {
165 err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
166 }
167 return nil, balancer.PickResult{}, dropError{error: err}
168 }
169 // For all other errors, wait for ready RPCs should block and other
170 // RPCs should fail with unavailable.
171 if !failfast {
172 lastPickErr = err
173 continue
174 }
175 return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
176 }
177
178 acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
179 if !ok {
180 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
181 continue
182 }
183 if t := acbw.ac.getReadyTransport(); t != nil {
184 if channelz.IsOn() {
185 doneChannelzWrapper(acbw, &pickResult)
186 return t, pickResult, nil
187 }
188 return t, pickResult, nil
189 }
190 if pickResult.Done != nil {
191 // Calling done with nil error, no bytes sent and no bytes received.
192 // DoneInfo with default value works.
193 pickResult.Done(balancer.DoneInfo{})
194 }
195 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
196 // If ok == false, ac.state is not READY.
197 // A valid picker always returns READY subConn. This means the state of ac
198 // just changed, and picker will be updated shortly.
199 // continue back to the beginning of the for loop to repick.
200 }
201}
202
203func (pw *pickerWrapper) close() {
204 old := pw.pickerGen.Swap(nil)
205 close(old.blockingCh)
206}
207
208// reset clears the pickerWrapper and prepares it for being used again when idle
209// mode is exited.
210func (pw *pickerWrapper) reset() {
211 old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
212 close(old.blockingCh)
213}
214
215// dropError is a wrapper error that indicates the LB policy wishes to drop the
216// RPC and not retry it.
217type dropError struct {
218 error
219}