1/*
2 *
3 * Copyright 2023 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package idle contains a component for managing idleness (entering and exiting)
20// based on RPC activity.
21package idle
22
23import (
24 "fmt"
25 "math"
26 "sync"
27 "sync/atomic"
28 "time"
29)
30
31// For overriding in unit tests.
32var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
33 return time.AfterFunc(d, f)
34}
35
36// Enforcer is the functionality provided by grpc.ClientConn to enter
37// and exit from idle mode.
38type Enforcer interface {
39 ExitIdleMode() error
40 EnterIdleMode()
41}
42
43// Manager implements idleness detection and calls the configured Enforcer to
44// enter/exit idle mode when appropriate. Must be created by NewManager.
45type Manager struct {
46 // State accessed atomically.
47 lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
48 activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
49 activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
50 closed int32 // Boolean; True when the manager is closed.
51
52 // Can be accessed without atomics or mutex since these are set at creation
53 // time and read-only after that.
54 enforcer Enforcer // Functionality provided by grpc.ClientConn.
55 timeout time.Duration
56
57 // idleMu is used to guarantee mutual exclusion in two scenarios:
58 // - Opposing intentions:
59 // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
60 // the channel in idle mode because the channel has been inactive.
61 // - b: At the same time an RPC is made on the channel, and OnCallBegin()
62 // is trying to prevent the channel from going idle.
63 // - Competing intentions:
64 // - The channel is in idle mode and there are multiple RPCs starting at
65 // the same time, all trying to move the channel out of idle. Only one
66 // of them should succeed in doing so, while the other RPCs should
67 // piggyback on the first one and be successfully handled.
68 idleMu sync.RWMutex
69 actuallyIdle bool
70 timer *time.Timer
71}
72
73// NewManager creates a new idleness manager implementation for the
74// given idle timeout. It begins in idle mode.
75func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
76 return &Manager{
77 enforcer: enforcer,
78 timeout: timeout,
79 actuallyIdle: true,
80 activeCallsCount: -math.MaxInt32,
81 }
82}
83
84// resetIdleTimerLocked resets the idle timer to the given duration. Called
85// when exiting idle mode or when the timer fires and we need to reset it.
86func (m *Manager) resetIdleTimerLocked(d time.Duration) {
87 if m.isClosed() || m.timeout == 0 || m.actuallyIdle {
88 return
89 }
90
91 // It is safe to ignore the return value from Reset() because this method is
92 // only ever called from the timer callback or when exiting idle mode.
93 if m.timer != nil {
94 m.timer.Stop()
95 }
96 m.timer = timeAfterFunc(d, m.handleIdleTimeout)
97}
98
99func (m *Manager) resetIdleTimer(d time.Duration) {
100 m.idleMu.Lock()
101 defer m.idleMu.Unlock()
102 m.resetIdleTimerLocked(d)
103}
104
105// handleIdleTimeout is the timer callback that is invoked upon expiry of the
106// configured idle timeout. The channel is considered inactive if there are no
107// ongoing calls and no RPC activity since the last time the timer fired.
108func (m *Manager) handleIdleTimeout() {
109 if m.isClosed() {
110 return
111 }
112
113 if atomic.LoadInt32(&m.activeCallsCount) > 0 {
114 m.resetIdleTimer(m.timeout)
115 return
116 }
117
118 // There has been activity on the channel since we last got here. Reset the
119 // timer and return.
120 if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
121 // Set the timer to fire after a duration of idle timeout, calculated
122 // from the time the most recent RPC completed.
123 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
124 m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)
125 return
126 }
127
128 // Now that we've checked that there has been no activity, attempt to enter
129 // idle mode, which is very likely to succeed.
130 if m.tryEnterIdleMode() {
131 // Successfully entered idle mode. No timer needed until we exit idle.
132 return
133 }
134
135 // Failed to enter idle mode due to a concurrent RPC that kept the channel
136 // active, or because of an error from the channel. Undo the attempt to
137 // enter idle, and reset the timer to try again later.
138 m.resetIdleTimer(m.timeout)
139}
140
141// tryEnterIdleMode instructs the channel to enter idle mode. But before
142// that, it performs a last minute check to ensure that no new RPC has come in,
143// making the channel active.
144//
145// Return value indicates whether or not the channel moved to idle mode.
146//
147// Holds idleMu which ensures mutual exclusion with exitIdleMode.
148func (m *Manager) tryEnterIdleMode() bool {
149 // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
150 // that the channel is either in idle mode or is trying to get there.
151 if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
152 // This CAS operation can fail if an RPC started after we checked for
153 // activity in the timer handler, or one was ongoing from before the
154 // last time the timer fired, or if a test is attempting to enter idle
155 // mode without checking. In all cases, abort going into idle mode.
156 return false
157 }
158 // N.B. if we fail to enter idle mode after this, we must re-add
159 // math.MaxInt32 to m.activeCallsCount.
160
161 m.idleMu.Lock()
162 defer m.idleMu.Unlock()
163
164 if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
165 // We raced and lost to a new RPC. Very rare, but stop entering idle.
166 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
167 return false
168 }
169 if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
170 // A very short RPC could have come in (and also finished) after we
171 // checked for calls count and activity in handleIdleTimeout(), but
172 // before the CAS operation. So, we need to check for activity again.
173 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
174 return false
175 }
176
177 // No new RPCs have come in since we set the active calls count value to
178 // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
179 // unconditionally now.
180 m.enforcer.EnterIdleMode()
181 m.actuallyIdle = true
182 return true
183}
184
185// EnterIdleModeForTesting instructs the channel to enter idle mode.
186func (m *Manager) EnterIdleModeForTesting() {
187 m.tryEnterIdleMode()
188}
189
190// OnCallBegin is invoked at the start of every RPC.
191func (m *Manager) OnCallBegin() error {
192 if m.isClosed() {
193 return nil
194 }
195
196 if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
197 // Channel is not idle now. Set the activity bit and allow the call.
198 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
199 return nil
200 }
201
202 // Channel is either in idle mode or is in the process of moving to idle
203 // mode. Attempt to exit idle mode to allow this RPC.
204 if err := m.ExitIdleMode(); err != nil {
205 // Undo the increment to calls count, and return an error causing the
206 // RPC to fail.
207 atomic.AddInt32(&m.activeCallsCount, -1)
208 return err
209 }
210
211 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
212 return nil
213}
214
215// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
216// internal state.
217func (m *Manager) ExitIdleMode() error {
218 // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
219 m.idleMu.Lock()
220 defer m.idleMu.Unlock()
221
222 if m.isClosed() || !m.actuallyIdle {
223 // This can happen in three scenarios:
224 // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
225 // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
226 // came in and OnCallBegin() noticed that the calls count is negative.
227 // - Channel is in idle mode, and multiple new RPCs come in at the same
228 // time, all of them notice a negative calls count in OnCallBegin and get
229 // here. The first one to get the lock would get the channel to exit idle.
230 // - Channel is not in idle mode, and the user calls Connect which calls
231 // m.ExitIdleMode.
232 //
233 // In any case, there is nothing to do here.
234 return nil
235 }
236
237 if err := m.enforcer.ExitIdleMode(); err != nil {
238 return fmt.Errorf("failed to exit idle mode: %w", err)
239 }
240
241 // Undo the idle entry process. This also respects any new RPC attempts.
242 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
243 m.actuallyIdle = false
244
245 // Start a new timer to fire after the configured idle timeout.
246 m.resetIdleTimerLocked(m.timeout)
247 return nil
248}
249
250// OnCallEnd is invoked at the end of every RPC.
251func (m *Manager) OnCallEnd() {
252 if m.isClosed() {
253 return
254 }
255
256 // Record the time at which the most recent call finished.
257 atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
258
259 // Decrement the active calls count. This count can temporarily go negative
260 // when the timer callback is in the process of moving the channel to idle
261 // mode, but one or more RPCs come in and complete before the timer callback
262 // can get done with the process of moving to idle mode.
263 atomic.AddInt32(&m.activeCallsCount, -1)
264}
265
266func (m *Manager) isClosed() bool {
267 return atomic.LoadInt32(&m.closed) == 1
268}
269
270// Close stops the timer associated with the Manager, if it exists.
271func (m *Manager) Close() {
272 atomic.StoreInt32(&m.closed, 1)
273
274 m.idleMu.Lock()
275 if m.timer != nil {
276 m.timer.Stop()
277 m.timer = nil
278 }
279 m.idleMu.Unlock()
280}