util.rs

  1use crate::{BackgroundExecutor, Task};
  2use std::{
  3    future::Future,
  4    pin::Pin,
  5    sync::atomic::{AtomicUsize, Ordering::SeqCst},
  6    task,
  7    time::Duration,
  8};
  9
 10/// A helper trait for building complex objects with imperative conditionals in a fluent style.
 11pub trait FluentBuilder {
 12    /// Imperatively modify self with the given closure.
 13    fn map<U>(self, f: impl FnOnce(Self) -> U) -> U
 14    where
 15        Self: Sized,
 16    {
 17        f(self)
 18    }
 19
 20    /// Conditionally modify self with the given closure.
 21    fn when(self, condition: bool, then: impl FnOnce(Self) -> Self) -> Self
 22    where
 23        Self: Sized,
 24    {
 25        self.map(|this| if condition { then(this) } else { this })
 26    }
 27
 28    /// Conditionally modify self with the given closure.
 29    fn when_else(
 30        self,
 31        condition: bool,
 32        then: impl FnOnce(Self) -> Self,
 33        else_fn: impl FnOnce(Self) -> Self,
 34    ) -> Self
 35    where
 36        Self: Sized,
 37    {
 38        self.map(|this| if condition { then(this) } else { else_fn(this) })
 39    }
 40
 41    /// Conditionally unwrap and modify self with the given closure, if the given option is Some.
 42    fn when_some<T>(self, option: Option<T>, then: impl FnOnce(Self, T) -> Self) -> Self
 43    where
 44        Self: Sized,
 45    {
 46        self.map(|this| {
 47            if let Some(value) = option {
 48                then(this, value)
 49            } else {
 50                this
 51            }
 52        })
 53    }
 54    /// Conditionally unwrap and modify self with the given closure, if the given option is None.
 55    fn when_none<T>(self, option: &Option<T>, then: impl FnOnce(Self) -> Self) -> Self
 56    where
 57        Self: Sized,
 58    {
 59        self.map(|this| if option.is_some() { this } else { then(this) })
 60    }
 61}
 62
 63/// Extensions for Future types that provide additional combinators and utilities.
 64pub trait FutureExt {
 65    /// Requires a Future to complete before the specified duration has elapsed.
 66    /// Similar to tokio::timeout.
 67    fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
 68    where
 69        Self: Sized;
 70}
 71
 72impl<T: Future> FutureExt for T {
 73    fn with_timeout(self, timeout: Duration, executor: &BackgroundExecutor) -> WithTimeout<Self>
 74    where
 75        Self: Sized,
 76    {
 77        WithTimeout {
 78            future: self,
 79            timer: executor.timer(timeout),
 80        }
 81    }
 82}
 83
 84#[pin_project::pin_project]
 85pub struct WithTimeout<T> {
 86    #[pin]
 87    future: T,
 88    #[pin]
 89    timer: Task<()>,
 90}
 91
 92#[derive(Debug, thiserror::Error)]
 93#[error("Timed out before future resolved")]
 94/// Error returned by with_timeout when the timeout duration elapsed before the future resolved
 95pub struct Timeout;
 96
 97impl<T: Future> Future for WithTimeout<T> {
 98    type Output = Result<T::Output, Timeout>;
 99
100    fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
101        let this = self.project();
102
103        if let task::Poll::Ready(output) = this.future.poll(cx) {
104            task::Poll::Ready(Ok(output))
105        } else if this.timer.poll(cx).is_ready() {
106            task::Poll::Ready(Err(Timeout))
107        } else {
108            task::Poll::Pending
109        }
110    }
111}
112
113/// Increment the given atomic counter if it is not zero.
114/// Return the new value of the counter.
115pub(crate) fn atomic_incr_if_not_zero(counter: &AtomicUsize) -> usize {
116    let mut loaded = counter.load(SeqCst);
117    loop {
118        if loaded == 0 {
119            return 0;
120        }
121        match counter.compare_exchange_weak(loaded, loaded + 1, SeqCst, SeqCst) {
122            Ok(x) => return x + 1,
123            Err(actual) => loaded = actual,
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use crate::TestAppContext;
131
132    use super::*;
133
134    #[gpui::test]
135    async fn test_with_timeout(cx: &mut TestAppContext) {
136        Task::ready(())
137            .with_timeout(Duration::from_secs(1), &cx.executor())
138            .await
139            .expect("Timeout should be noop");
140
141        let long_duration = Duration::from_secs(6000);
142        let short_duration = Duration::from_secs(1);
143        cx.executor()
144            .timer(long_duration)
145            .with_timeout(short_duration, &cx.executor())
146            .await
147            .expect_err("timeout should have triggered");
148
149        let fut = cx
150            .executor()
151            .timer(long_duration)
152            .with_timeout(short_duration, &cx.executor());
153        cx.executor().advance_clock(short_duration * 2);
154        futures::FutureExt::now_or_never(fut)
155            .unwrap_or_else(|| panic!("timeout should have triggered"))
156            .expect_err("timeout");
157    }
158}