static_source.rs

  1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
  2
  3use std::sync::Arc;
  4
  5use futures::StreamExt;
  6use gpui::AppContext;
  7use parking_lot::RwLock;
  8use serde::Deserialize;
  9use util::ResultExt;
 10
 11use crate::TaskTemplates;
 12use futures::channel::mpsc::UnboundedReceiver;
 13
 14/// The source of tasks defined in a tasks config file.
 15pub struct StaticSource {
 16    tasks: TrackedFile<TaskTemplates>,
 17}
 18
 19/// A Wrapper around deserializable T that keeps track of its contents
 20/// via a provided channel. Once T value changes, the observers of [`TrackedFile`] are
 21/// notified.
 22pub struct TrackedFile<T> {
 23    parsed_contents: Arc<RwLock<T>>,
 24}
 25
 26impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
 27    /// Initializes new [`TrackedFile`] with a type that's deserializable.
 28    pub fn new(mut tracker: UnboundedReceiver<String>, cx: &mut AppContext) -> Self
 29    where
 30        T: for<'a> Deserialize<'a> + Default + Send,
 31    {
 32        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 33        cx.background_executor()
 34            .spawn({
 35                let parsed_contents = parsed_contents.clone();
 36                async move {
 37                    while let Some(new_contents) = tracker.next().await {
 38                        if Arc::strong_count(&parsed_contents) == 1 {
 39                            // We're no longer being observed. Stop polling.
 40                            break;
 41                        }
 42                        if !new_contents.trim().is_empty() {
 43                            let Some(new_contents) =
 44                                serde_json_lenient::from_str::<T>(&new_contents).log_err()
 45                            else {
 46                                continue;
 47                            };
 48                            let mut contents = parsed_contents.write();
 49                            *contents = new_contents;
 50                        }
 51                    }
 52                    anyhow::Ok(())
 53                }
 54            })
 55            .detach_and_log_err(cx);
 56        Self { parsed_contents }
 57    }
 58
 59    /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
 60    pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
 61        mut tracker: UnboundedReceiver<String>,
 62        cx: &mut AppContext,
 63    ) -> Self
 64    where
 65        T: Default + Send,
 66    {
 67        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 68        cx.background_executor()
 69            .spawn({
 70                let parsed_contents = parsed_contents.clone();
 71                async move {
 72                    while let Some(new_contents) = tracker.next().await {
 73                        if Arc::strong_count(&parsed_contents) == 1 {
 74                            // We're no longer being observed. Stop polling.
 75                            break;
 76                        }
 77
 78                        if !new_contents.trim().is_empty() {
 79                            let Some(new_contents) =
 80                                serde_json_lenient::from_str::<U>(&new_contents).log_err()
 81                            else {
 82                                continue;
 83                            };
 84                            let Some(new_contents) = new_contents.try_into().log_err() else {
 85                                continue;
 86                            };
 87                            let mut contents = parsed_contents.write();
 88                            *contents = new_contents;
 89                        }
 90                    }
 91                    anyhow::Ok(())
 92                }
 93            })
 94            .detach_and_log_err(cx);
 95        Self {
 96            parsed_contents: Default::default(),
 97        }
 98    }
 99}
100
101impl StaticSource {
102    /// Initializes the static source, reacting on tasks config changes.
103    pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
104        Self { tasks }
105    }
106    /// Returns current list of tasks
107    pub fn tasks_to_schedule(&self) -> TaskTemplates {
108        self.tasks.parsed_contents.read().clone()
109    }
110}