1use crate::{BackgroundExecutor, Task};
2use std::{
3 future::Future,
4 pin::Pin,
5 sync::atomic::{AtomicUsize, Ordering::SeqCst},
6 task,
7 time::Duration,
8};
9
10pub use util::*;
11
12/// A helper trait for building complex objects with imperative conditionals in a fluent style.
13pub trait FluentBuilder {
14 /// Imperatively modify self with the given closure.
15 fn map<U>(self, f: impl FnOnce(Self) -> U) -> U
16 where
17 Self: Sized,
18 {
19 f(self)
20 }
21
22 /// Conditionally modify self with the given closure.
23 fn when(self, condition: bool, then: impl FnOnce(Self) -> Self) -> Self
24 where
25 Self: Sized,
26 {
27 self.map(|this| if condition { then(this) } else { this })
28 }
29
30 /// Conditionally modify self with the given closure.
31 fn when_else(
32 self,
33 condition: bool,
34 then: impl FnOnce(Self) -> Self,
35 else_fn: impl FnOnce(Self) -> Self,
36 ) -> Self
37 where
38 Self: Sized,
39 {
40 self.map(|this| if condition { then(this) } else { else_fn(this) })
41 }
42
43 /// Conditionally unwrap and modify self with the given closure, if the given option is Some.
44 fn when_some<T>(self, option: Option<T>, then: impl FnOnce(Self, T) -> Self) -> Self
45 where
46 Self: Sized,
47 {
48 self.map(|this| {
49 if let Some(value) = option {
50 then(this, value)
51 } else {
52 this
53 }
54 })
55 }
56 /// Conditionally unwrap and modify self with the given closure, if the given option is None.
57 fn when_none<T>(self, option: &Option<T>, then: impl FnOnce(Self) -> Self) -> Self
58 where
59 Self: Sized,
60 {
61 self.map(|this| if option.is_some() { this } else { then(this) })
62 }
63}
64
65/// Extensions for Future types that provide additional combinators and utilities.
66pub trait FutureExt {
67 /// Requires a Future to complete before the specified duration has elapsed.
68 /// Similar to tokio::timeout.
69 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
70 where
71 Self: Sized;
72}
73
74impl<T: Future> FutureExt for T {
75 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
76 where
77 Self: Sized,
78 {
79 WithTimeout {
80 future: self,
81 timer: executor.timer(timeout),
82 }
83 }
84}
85
86pub struct WithTimeout<T> {
87 future: T,
88 timer: Task<()>,
89}
90
91#[derive(Debug, thiserror::Error)]
92#[error("Timed out before future resolved")]
93/// Error returned by with_timeout when the timeout duration elapsed before the future resolved
94pub struct Timeout;
95
96impl<T: Future> Future for WithTimeout<T> {
97 type Output = Result<T::Output, Timeout>;
98
99 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
100 // SAFETY: the fields of Timeout are private and we never move the future ourselves
101 // And its already pinned since we are being polled (all futures need to be pinned to be polled)
102 let this = unsafe { &raw mut *self.get_unchecked_mut() };
103 let future = unsafe { Pin::new_unchecked(&mut (*this).future) };
104 let timer = unsafe { Pin::new_unchecked(&mut (*this).timer) };
105
106 if let task::Poll::Ready(output) = future.poll(cx) {
107 task::Poll::Ready(Ok(output))
108 } else if timer.poll(cx).is_ready() {
109 task::Poll::Ready(Err(Timeout))
110 } else {
111 task::Poll::Pending
112 }
113 }
114}
115
116#[cfg(any(test, feature = "test-support"))]
117pub async fn smol_timeout<F, T>(timeout: Duration, f: F) -> Result<T, ()>
118where
119 F: Future<Output = T>,
120{
121 let timer = async {
122 smol::Timer::after(timeout).await;
123 Err(())
124 };
125 let future = async move { Ok(f.await) };
126 smol::future::FutureExt::race(timer, future).await
127}
128
129/// Increment the given atomic counter if it is not zero.
130/// Return the new value of the counter.
131pub(crate) fn atomic_incr_if_not_zero(counter: &AtomicUsize) -> usize {
132 let mut loaded = counter.load(SeqCst);
133 loop {
134 if loaded == 0 {
135 return 0;
136 }
137 match counter.compare_exchange_weak(loaded, loaded + 1, SeqCst, SeqCst) {
138 Ok(x) => return x + 1,
139 Err(actual) => loaded = actual,
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use crate::TestAppContext;
147
148 use super::*;
149
150 #[gpui::test]
151 async fn test_with_timeout(cx: &mut TestAppContext) {
152 Task::ready(())
153 .with_timeout(Duration::from_secs(1), &cx.executor())
154 .await
155 .expect("Timeout should be noop");
156
157 let long_duration = Duration::from_secs(6000);
158 let short_duration = Duration::from_secs(1);
159 cx.executor()
160 .timer(long_duration)
161 .with_timeout(short_duration, &cx.executor())
162 .await
163 .expect_err("timeout should have triggered");
164
165 let fut = cx
166 .executor()
167 .timer(long_duration)
168 .with_timeout(short_duration, &cx.executor());
169 cx.executor().advance_clock(short_duration * 2);
170 futures::FutureExt::now_or_never(fut)
171 .unwrap_or_else(|| panic!("timeout should have triggered"))
172 .expect_err("timeout");
173 }
174}