1use crate::{BackgroundExecutor, Task};
2use std::{
3 future::Future,
4 pin::Pin,
5 sync::atomic::{AtomicUsize, Ordering::SeqCst},
6 task,
7 time::Duration,
8};
9
10pub use util::*;
11
12/// A helper trait for building complex objects with imperative conditionals in a fluent style.
13pub trait FluentBuilder {
14 /// Imperatively modify self with the given closure.
15 fn map<U>(self, f: impl FnOnce(Self) -> U) -> U
16 where
17 Self: Sized,
18 {
19 f(self)
20 }
21
22 /// Conditionally modify self with the given closure.
23 fn when(self, condition: bool, then: impl FnOnce(Self) -> Self) -> Self
24 where
25 Self: Sized,
26 {
27 self.map(|this| if condition { then(this) } else { this })
28 }
29
30 /// Conditionally modify self with the given closure.
31 fn when_else(
32 self,
33 condition: bool,
34 then: impl FnOnce(Self) -> Self,
35 else_fn: impl FnOnce(Self) -> Self,
36 ) -> Self
37 where
38 Self: Sized,
39 {
40 self.map(|this| if condition { then(this) } else { else_fn(this) })
41 }
42
43 /// Conditionally unwrap and modify self with the given closure, if the given option is Some.
44 fn when_some<T>(self, option: Option<T>, then: impl FnOnce(Self, T) -> Self) -> Self
45 where
46 Self: Sized,
47 {
48 self.map(|this| {
49 if let Some(value) = option {
50 then(this, value)
51 } else {
52 this
53 }
54 })
55 }
56 /// Conditionally unwrap and modify self with the given closure, if the given option is None.
57 fn when_none<T>(self, option: &Option<T>, then: impl FnOnce(Self) -> Self) -> Self
58 where
59 Self: Sized,
60 {
61 self.map(|this| if option.is_some() { this } else { then(this) })
62 }
63}
64
65/// Extensions for Future types that provide additional combinators and utilities.
66pub trait FutureExt {
67 /// Requires a Future to complete before the specified duration has elapsed.
68 /// Similar to tokio::timeout.
69 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
70 where
71 Self: Sized;
72}
73
74impl<T: Future> FutureExt for T {
75 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
76 where
77 Self: Sized,
78 {
79 WithTimeout {
80 future: self,
81 timer: executor.timer(timeout),
82 }
83 }
84}
85
86pub struct WithTimeout<T> {
87 future: T,
88 timer: Task<()>,
89}
90
91#[derive(Debug, thiserror::Error)]
92#[error("Timed out before future resolved")]
93/// Error returned by with_timeout when the timeout duration elapsed before the future resolved
94pub struct Timeout;
95
96impl<T: Future> Future for WithTimeout<T> {
97 type Output = Result<T::Output, Timeout>;
98
99 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
100 // SAFETY: the fields of Timeout are private and we never move the future ourselves
101 // And its already pinned since we are being polled (all futures need to be pinned to be polled)
102 let this = unsafe { &raw mut *self.get_unchecked_mut() };
103 let future = unsafe { Pin::new_unchecked(&mut (*this).future) };
104 let timer = unsafe { Pin::new_unchecked(&mut (*this).timer) };
105
106 if let task::Poll::Ready(output) = future.poll(cx) {
107 task::Poll::Ready(Ok(output))
108 } else if timer.poll(cx).is_ready() {
109 task::Poll::Ready(Err(Timeout))
110 } else {
111 task::Poll::Pending
112 }
113 }
114}
115
116#[cfg(any(test, feature = "test-support"))]
117/// Uses smol executor to run a given future no longer than the timeout specified.
118/// Note that this won't "rewind" on `cx.executor().advance_clock` call, truly waiting for the timeout to elapse.
119pub async fn smol_timeout<F, T>(timeout: Duration, f: F) -> Result<T, ()>
120where
121 F: Future<Output = T>,
122{
123 let timer = async {
124 smol::Timer::after(timeout).await;
125 Err(())
126 };
127 let future = async move { Ok(f.await) };
128 smol::future::FutureExt::race(timer, future).await
129}
130
131/// Increment the given atomic counter if it is not zero.
132/// Return the new value of the counter.
133pub(crate) fn atomic_incr_if_not_zero(counter: &AtomicUsize) -> usize {
134 let mut loaded = counter.load(SeqCst);
135 loop {
136 if loaded == 0 {
137 return 0;
138 }
139 match counter.compare_exchange_weak(loaded, loaded + 1, SeqCst, SeqCst) {
140 Ok(x) => return x + 1,
141 Err(actual) => loaded = actual,
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use crate::TestAppContext;
149
150 use super::*;
151
152 #[gpui::test]
153 async fn test_with_timeout(cx: &mut TestAppContext) {
154 Task::ready(())
155 .with_timeout(Duration::from_secs(1), &cx.executor())
156 .await
157 .expect("Timeout should be noop");
158
159 let long_duration = Duration::from_secs(6000);
160 let short_duration = Duration::from_secs(1);
161 cx.executor()
162 .timer(long_duration)
163 .with_timeout(short_duration, &cx.executor())
164 .await
165 .expect_err("timeout should have triggered");
166
167 let fut = cx
168 .executor()
169 .timer(long_duration)
170 .with_timeout(short_duration, &cx.executor());
171 cx.executor().advance_clock(short_duration * 2);
172 futures::FutureExt::now_or_never(fut)
173 .unwrap_or_else(|| panic!("timeout should have triggered"))
174 .expect_err("timeout");
175 }
176}