1use crate::{BackgroundExecutor, Task};
2use std::{
3 future::Future,
4 pin::Pin,
5 sync::atomic::{AtomicUsize, Ordering::SeqCst},
6 task,
7 time::Duration,
8};
9
10pub use util::*;
11
12/// A helper trait for building complex objects with imperative conditionals in a fluent style.
13pub trait FluentBuilder {
14 /// Imperatively modify self with the given closure.
15 fn map<U>(self, f: impl FnOnce(Self) -> U) -> U
16 where
17 Self: Sized,
18 {
19 f(self)
20 }
21
22 /// Conditionally modify self with the given closure.
23 fn when(self, condition: bool, then: impl FnOnce(Self) -> Self) -> Self
24 where
25 Self: Sized,
26 {
27 self.map(|this| if condition { then(this) } else { this })
28 }
29
30 /// Conditionally modify self with the given closure.
31 fn when_else(
32 self,
33 condition: bool,
34 then: impl FnOnce(Self) -> Self,
35 else_fn: impl FnOnce(Self) -> Self,
36 ) -> Self
37 where
38 Self: Sized,
39 {
40 self.map(|this| if condition { then(this) } else { else_fn(this) })
41 }
42
43 /// Conditionally unwrap and modify self with the given closure, if the given option is Some.
44 fn when_some<T>(self, option: Option<T>, then: impl FnOnce(Self, T) -> Self) -> Self
45 where
46 Self: Sized,
47 {
48 self.map(|this| {
49 if let Some(value) = option {
50 then(this, value)
51 } else {
52 this
53 }
54 })
55 }
56 /// Conditionally unwrap and modify self with the given closure, if the given option is None.
57 fn when_none<T>(self, option: &Option<T>, then: impl FnOnce(Self) -> Self) -> Self
58 where
59 Self: Sized,
60 {
61 self.map(|this| if option.is_some() { this } else { then(this) })
62 }
63}
64
65/// Extensions for Future types that provide additional combinators and utilities.
66pub trait FutureExt {
67 /// Requires a Future to complete before the specified duration has elapsed.
68 /// Similar to tokio::timeout.
69 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
70 where
71 Self: Sized;
72}
73
74impl<T: Future> FutureExt for T {
75 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
76 where
77 Self: Sized,
78 {
79 WithTimeout {
80 future: self,
81 timer: executor.timer(timeout),
82 }
83 }
84}
85
86#[pin_project::pin_project]
87pub struct WithTimeout<T> {
88 #[pin]
89 future: T,
90 #[pin]
91 timer: Task<()>,
92}
93
94#[derive(Debug, thiserror::Error)]
95#[error("Timed out before future resolved")]
96/// Error returned by with_timeout when the timeout duration elapsed before the future resolved
97pub struct Timeout;
98
99impl<T: Future> Future for WithTimeout<T> {
100 type Output = Result<T::Output, Timeout>;
101
102 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
103 let this = self.project();
104
105 if let task::Poll::Ready(output) = this.future.poll(cx) {
106 task::Poll::Ready(Ok(output))
107 } else if this.timer.poll(cx).is_ready() {
108 task::Poll::Ready(Err(Timeout))
109 } else {
110 task::Poll::Pending
111 }
112 }
113}
114
115/// Increment the given atomic counter if it is not zero.
116/// Return the new value of the counter.
117pub(crate) fn atomic_incr_if_not_zero(counter: &AtomicUsize) -> usize {
118 let mut loaded = counter.load(SeqCst);
119 loop {
120 if loaded == 0 {
121 return 0;
122 }
123 match counter.compare_exchange_weak(loaded, loaded + 1, SeqCst, SeqCst) {
124 Ok(x) => return x + 1,
125 Err(actual) => loaded = actual,
126 }
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use crate::TestAppContext;
133
134 use super::*;
135
136 #[gpui::test]
137 async fn test_with_timeout(cx: &mut TestAppContext) {
138 Task::ready(())
139 .with_timeout(Duration::from_secs(1), &cx.executor())
140 .await
141 .expect("Timeout should be noop");
142
143 let long_duration = Duration::from_secs(6000);
144 let short_duration = Duration::from_secs(1);
145 cx.executor()
146 .timer(long_duration)
147 .with_timeout(short_duration, &cx.executor())
148 .await
149 .expect_err("timeout should have triggered");
150
151 let fut = cx
152 .executor()
153 .timer(long_duration)
154 .with_timeout(short_duration, &cx.executor());
155 cx.executor().advance_clock(short_duration * 2);
156 futures::FutureExt::now_or_never(fut)
157 .unwrap_or_else(|| panic!("timeout should have triggered"))
158 .expect_err("timeout");
159 }
160}