1use crate::{BackgroundExecutor, Task};
2use std::{
3 future::Future,
4 pin::Pin,
5 sync::atomic::{AtomicUsize, Ordering::SeqCst},
6 task,
7 time::Duration,
8};
9
10/// A helper trait for building complex objects with imperative conditionals in a fluent style.
11pub trait FluentBuilder {
12 /// Imperatively modify self with the given closure.
13 fn map<U>(self, f: impl FnOnce(Self) -> U) -> U
14 where
15 Self: Sized,
16 {
17 f(self)
18 }
19
20 /// Conditionally modify self with the given closure.
21 fn when(self, condition: bool, then: impl FnOnce(Self) -> Self) -> Self
22 where
23 Self: Sized,
24 {
25 self.map(|this| if condition { then(this) } else { this })
26 }
27
28 /// Conditionally modify self with the given closure.
29 fn when_else(
30 self,
31 condition: bool,
32 then: impl FnOnce(Self) -> Self,
33 else_fn: impl FnOnce(Self) -> Self,
34 ) -> Self
35 where
36 Self: Sized,
37 {
38 self.map(|this| if condition { then(this) } else { else_fn(this) })
39 }
40
41 /// Conditionally unwrap and modify self with the given closure, if the given option is Some.
42 fn when_some<T>(self, option: Option<T>, then: impl FnOnce(Self, T) -> Self) -> Self
43 where
44 Self: Sized,
45 {
46 self.map(|this| {
47 if let Some(value) = option {
48 then(this, value)
49 } else {
50 this
51 }
52 })
53 }
54 /// Conditionally unwrap and modify self with the given closure, if the given option is None.
55 fn when_none<T>(self, option: &Option<T>, then: impl FnOnce(Self) -> Self) -> Self
56 where
57 Self: Sized,
58 {
59 self.map(|this| if option.is_some() { this } else { then(this) })
60 }
61}
62
63/// Extensions for Future types that provide additional combinators and utilities.
64pub trait FutureExt {
65 /// Requires a Future to complete before the specified duration has elapsed.
66 /// Similar to tokio::timeout.
67 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
68 where
69 Self: Sized;
70}
71
72impl<T: Future> FutureExt for T {
73 fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
74 where
75 Self: Sized,
76 {
77 WithTimeout {
78 future: self,
79 timer: executor.timer(timeout),
80 }
81 }
82}
83
84#[pin_project::pin_project]
85pub struct WithTimeout<T> {
86 #[pin]
87 future: T,
88 #[pin]
89 timer: Task<()>,
90}
91
92#[derive(Debug, thiserror::Error)]
93#[error("Timed out before future resolved")]
94/// Error returned by with_timeout when the timeout duration elapsed before the future resolved
95pub struct Timeout;
96
97impl<T: Future> Future for WithTimeout<T> {
98 type Output = Result<T::Output, Timeout>;
99
100 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
101 let this = self.project();
102
103 if let task::Poll::Ready(output) = this.future.poll(cx) {
104 task::Poll::Ready(Ok(output))
105 } else if this.timer.poll(cx).is_ready() {
106 task::Poll::Ready(Err(Timeout))
107 } else {
108 task::Poll::Pending
109 }
110 }
111}
112
113/// Increment the given atomic counter if it is not zero.
114/// Return the new value of the counter.
115pub(crate) fn atomic_incr_if_not_zero(counter: &AtomicUsize) -> usize {
116 let mut loaded = counter.load(SeqCst);
117 loop {
118 if loaded == 0 {
119 return 0;
120 }
121 match counter.compare_exchange_weak(loaded, loaded + 1, SeqCst, SeqCst) {
122 Ok(x) => return x + 1,
123 Err(actual) => loaded = actual,
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use crate::TestAppContext;
131
132 use super::*;
133
134 #[gpui::test]
135 async fn test_with_timeout(cx: &mut TestAppContext) {
136 Task::ready(())
137 .with_timeout(Duration::from_secs(1), &cx.executor())
138 .await
139 .expect("Timeout should be noop");
140
141 let long_duration = Duration::from_secs(6000);
142 let short_duration = Duration::from_secs(1);
143 cx.executor()
144 .timer(long_duration)
145 .with_timeout(short_duration, &cx.executor())
146 .await
147 .expect_err("timeout should have triggered");
148
149 let fut = cx
150 .executor()
151 .timer(long_duration)
152 .with_timeout(short_duration, &cx.executor());
153 cx.executor().advance_clock(short_duration * 2);
154 futures::FutureExt::now_or_never(fut)
155 .unwrap_or_else(|| panic!("timeout should have triggered"))
156 .expect_err("timeout");
157 }
158}