static_source.rs

  1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
  2
  3use std::sync::Arc;
  4
  5use futures::{StreamExt, channel::mpsc::UnboundedSender};
  6use gpui::{App, AppContext};
  7use parking_lot::RwLock;
  8use serde::Deserialize;
  9use util::ResultExt;
 10
 11use crate::TaskTemplates;
 12use futures::channel::mpsc::UnboundedReceiver;
 13
 14/// The source of tasks defined in a tasks config file.
 15pub struct StaticSource {
 16    tasks: TrackedFile<TaskTemplates>,
 17}
 18
 19/// A Wrapper around deserializable T that keeps track of its contents
 20/// via a provided channel.
 21pub struct TrackedFile<T> {
 22    parsed_contents: Arc<RwLock<T>>,
 23}
 24
 25impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
 26    /// Initializes new [`TrackedFile`] with a type that's deserializable.
 27    pub fn new(
 28        mut tracker: UnboundedReceiver<String>,
 29        notification_outlet: UnboundedSender<()>,
 30        cx: &App,
 31    ) -> Self
 32    where
 33        T: for<'a> Deserialize<'a> + Default + Send,
 34    {
 35        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 36        cx.background_spawn({
 37            let parsed_contents = parsed_contents.clone();
 38            async move {
 39                while let Some(new_contents) = tracker.next().await {
 40                    if Arc::strong_count(&parsed_contents) == 1 {
 41                        // We're no longer being observed. Stop polling.
 42                        break;
 43                    }
 44                    if !new_contents.trim().is_empty() {
 45                        let Some(new_contents) =
 46                            serde_json_lenient::from_str::<T>(&new_contents).log_err()
 47                        else {
 48                            continue;
 49                        };
 50                        let mut contents = parsed_contents.write();
 51                        if *contents != new_contents {
 52                            *contents = new_contents;
 53                            if notification_outlet.unbounded_send(()).is_err() {
 54                                // Whoever cared about contents is not around anymore.
 55                                break;
 56                            }
 57                        }
 58                    }
 59                }
 60                anyhow::Ok(())
 61            }
 62        })
 63        .detach_and_log_err(cx);
 64        Self { parsed_contents }
 65    }
 66
 67    /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
 68    pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
 69        mut tracker: UnboundedReceiver<String>,
 70        notification_outlet: UnboundedSender<()>,
 71        cx: &App,
 72    ) -> Self
 73    where
 74        T: Default + Send,
 75    {
 76        let parsed_contents: Arc<RwLock<T>> = Arc::default();
 77        cx.background_spawn({
 78            async move {
 79                while let Some(new_contents) = tracker.next().await {
 80                    if Arc::strong_count(&parsed_contents) == 1 {
 81                        // We're no longer being observed. Stop polling.
 82                        break;
 83                    }
 84
 85                    if !new_contents.trim().is_empty() {
 86                        let Some(new_contents) =
 87                            serde_json_lenient::from_str::<U>(&new_contents).log_err()
 88                        else {
 89                            continue;
 90                        };
 91                        let Some(new_contents) = new_contents.try_into().log_err() else {
 92                            continue;
 93                        };
 94                        let mut contents = parsed_contents.write();
 95                        if *contents != new_contents {
 96                            *contents = new_contents;
 97                            if notification_outlet.unbounded_send(()).is_err() {
 98                                // Whoever cared about contents is not around anymore.
 99                                break;
100                            }
101                        }
102                    }
103                }
104                anyhow::Ok(())
105            }
106        })
107        .detach_and_log_err(cx);
108        Self {
109            parsed_contents: Default::default(),
110        }
111    }
112}
113
114impl StaticSource {
115    /// Initializes the static source, reacting on tasks config changes.
116    pub const fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
117        Self { tasks }
118    }
119    /// Returns current list of tasks
120    pub fn tasks_to_schedule(&self) -> TaskTemplates {
121        self.tasks.parsed_contents.read().clone()
122    }
123}