subscription.rs

 1use crate::{Edit, Patch};
 2use parking_lot::Mutex;
 3use std::{
 4    mem,
 5    sync::{Arc, Weak},
 6};
 7
 8#[derive(Default)]
 9pub struct Topic<T>(Mutex<Vec<Weak<Mutex<Patch<T>>>>>);
10
11pub struct Subscription<T>(Arc<Mutex<Patch<T>>>);
12
13impl<T: Default, TDelta> Topic<T>
14where
15    T: 'static
16        + Copy
17        + Ord
18        + std::ops::Sub<T, Output = TDelta>
19        + std::ops::Add<TDelta, Output = T>
20        + std::ops::AddAssign<TDelta>
21        + Default,
22    TDelta: Ord + Copy,
23{
24    pub fn subscribe(&mut self) -> Subscription<T> {
25        let subscription = Subscription(Default::default());
26        self.0.get_mut().push(Arc::downgrade(&subscription.0));
27        subscription
28    }
29
30    pub fn publish(&self, edits: impl Clone + IntoIterator<Item = Edit<T>>) {
31        publish(&mut self.0.lock(), edits);
32    }
33
34    pub fn publish_mut(&mut self, edits: impl Clone + IntoIterator<Item = Edit<T>>) {
35        publish(self.0.get_mut(), edits);
36    }
37}
38
39impl<T: Default> Subscription<T> {
40    pub fn consume(&self) -> Patch<T> {
41        mem::take(&mut *self.0.lock())
42    }
43}
44
45fn publish<T, TDelta>(
46    subscriptions: &mut Vec<Weak<Mutex<Patch<T>>>>,
47    edits: impl Clone + IntoIterator<Item = Edit<T>>,
48) where
49    T: 'static
50        + Copy
51        + Ord
52        + std::ops::Sub<T, Output = TDelta>
53        + std::ops::Add<TDelta, Output = T>
54        + std::ops::AddAssign<TDelta>
55        + Default,
56    TDelta: Ord + Copy,
57{
58    subscriptions.retain(|subscription| {
59        if let Some(subscription) = subscription.upgrade() {
60            let mut patch = subscription.lock();
61            *patch = patch.compose(edits.clone());
62            true
63        } else {
64            false
65        }
66    });
67}