static_source.rs

  1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
  2
  3use std::sync::Arc;
  4
  5use futures::{channel::mpsc::UnboundedSender, StreamExt};
  6use gpui::App;
  7use parking_lot::RwLock;
  8use serde::Deserialize;
  9use util::ResultExt;
 10
 11use crate::TaskTemplates;
 12use futures::channel::mpsc::UnboundedReceiver;
 13
 14/// The source of tasks defined in a tasks config file.
 15pub struct StaticSource {
 16    tasks: TrackedFile<TaskTemplates>,
 17}
 18
 19/// A Wrapper around deserializable T that keeps track of its contents
 20/// via a provided channel.
 21pub struct TrackedFile<T> {
 22    parsed_contents: Arc<RwLock<T>>,
 23}
 24
 25impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
 26    /// Initializes new [`TrackedFile`] with a type that's deserializable.
 27    pub fn new(
 28        mut tracker: UnboundedReceiver<String>,
 29        notification_outlet: UnboundedSender<()>,
 30        cx: &App,
 31    ) -> Self
 32    where
 33        T: for<'a> Deserialize<'a> + Default + Send,
 34    {
 35        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 36        cx.background_executor()
 37            .spawn({
 38                let parsed_contents = parsed_contents.clone();
 39                async move {
 40                    while let Some(new_contents) = tracker.next().await {
 41                        if Arc::strong_count(&parsed_contents) == 1 {
 42                            // We're no longer being observed. Stop polling.
 43                            break;
 44                        }
 45                        if !new_contents.trim().is_empty() {
 46                            let Some(new_contents) =
 47                                serde_json_lenient::from_str::<T>(&new_contents).log_err()
 48                            else {
 49                                continue;
 50                            };
 51                            let mut contents = parsed_contents.write();
 52                            if *contents != new_contents {
 53                                *contents = new_contents;
 54                                if notification_outlet.unbounded_send(()).is_err() {
 55                                    // Whoever cared about contents is not around anymore.
 56                                    break;
 57                                }
 58                            }
 59                        }
 60                    }
 61                    anyhow::Ok(())
 62                }
 63            })
 64            .detach_and_log_err(cx);
 65        Self { parsed_contents }
 66    }
 67
 68    /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
 69    pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
 70        mut tracker: UnboundedReceiver<String>,
 71        notification_outlet: UnboundedSender<()>,
 72        cx: &App,
 73    ) -> Self
 74    where
 75        T: Default + Send,
 76    {
 77        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 78        cx.background_executor()
 79            .spawn({
 80                let parsed_contents = parsed_contents.clone();
 81                async move {
 82                    while let Some(new_contents) = tracker.next().await {
 83                        if Arc::strong_count(&parsed_contents) == 1 {
 84                            // We're no longer being observed. Stop polling.
 85                            break;
 86                        }
 87
 88                        if !new_contents.trim().is_empty() {
 89                            let Some(new_contents) =
 90                                serde_json_lenient::from_str::<U>(&new_contents).log_err()
 91                            else {
 92                                continue;
 93                            };
 94                            let Some(new_contents) = new_contents.try_into().log_err() else {
 95                                continue;
 96                            };
 97                            let mut contents = parsed_contents.write();
 98                            if *contents != new_contents {
 99                                *contents = new_contents;
100                                if notification_outlet.unbounded_send(()).is_err() {
101                                    // Whoever cared about contents is not around anymore.
102                                    break;
103                                }
104                            }
105                        }
106                    }
107                    anyhow::Ok(())
108                }
109            })
110            .detach_and_log_err(cx);
111        Self {
112            parsed_contents: Default::default(),
113        }
114    }
115}
116
117impl StaticSource {
118    /// Initializes the static source, reacting on tasks config changes.
119    pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
120        Self { tasks }
121    }
122    /// Returns current list of tasks
123    pub fn tasks_to_schedule(&self) -> TaskTemplates {
124        self.tasks.parsed_contents.read().clone()
125    }
126}