1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
2
3use std::sync::Arc;
4
5use futures::StreamExt;
6use gpui::AppContext;
7use parking_lot::RwLock;
8use serde::Deserialize;
9use util::ResultExt;
10
11use crate::TaskTemplates;
12use futures::channel::mpsc::UnboundedReceiver;
13
14/// The source of tasks defined in a tasks config file.
15pub struct StaticSource {
16 tasks: TrackedFile<TaskTemplates>,
17}
18
19/// A Wrapper around deserializable T that keeps track of its contents
20/// via a provided channel. Once T value changes, the observers of [`TrackedFile`] are
21/// notified.
22pub struct TrackedFile<T> {
23 parsed_contents: Arc<RwLock<T>>,
24}
25
26impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
27 /// Initializes new [`TrackedFile`] with a type that's deserializable.
28 pub fn new(mut tracker: UnboundedReceiver<String>, cx: &mut AppContext) -> Self
29 where
30 T: for<'a> Deserialize<'a> + Default + Send,
31 {
32 let parsed_contents: Arc<RwLock<T>> = Arc::default();
33 cx.background_executor()
34 .spawn({
35 let parsed_contents = parsed_contents.clone();
36 async move {
37 while let Some(new_contents) = tracker.next().await {
38 if Arc::strong_count(&parsed_contents) == 1 {
39 // We're no longer being observed. Stop polling.
40 break;
41 }
42 if !new_contents.trim().is_empty() {
43 let Some(new_contents) =
44 serde_json_lenient::from_str::<T>(&new_contents).log_err()
45 else {
46 continue;
47 };
48 let mut contents = parsed_contents.write();
49 *contents = new_contents;
50 }
51 }
52 anyhow::Ok(())
53 }
54 })
55 .detach_and_log_err(cx);
56 Self { parsed_contents }
57 }
58
59 /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
60 pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
61 mut tracker: UnboundedReceiver<String>,
62 cx: &mut AppContext,
63 ) -> Self
64 where
65 T: Default + Send,
66 {
67 let parsed_contents: Arc<RwLock<T>> = Arc::default();
68 cx.background_executor()
69 .spawn({
70 let parsed_contents = parsed_contents.clone();
71 async move {
72 while let Some(new_contents) = tracker.next().await {
73 if Arc::strong_count(&parsed_contents) == 1 {
74 // We're no longer being observed. Stop polling.
75 break;
76 }
77
78 if !new_contents.trim().is_empty() {
79 let Some(new_contents) =
80 serde_json_lenient::from_str::<U>(&new_contents).log_err()
81 else {
82 continue;
83 };
84 let Some(new_contents) = new_contents.try_into().log_err() else {
85 continue;
86 };
87 let mut contents = parsed_contents.write();
88 *contents = new_contents;
89 }
90 }
91 anyhow::Ok(())
92 }
93 })
94 .detach_and_log_err(cx);
95 Self {
96 parsed_contents: Default::default(),
97 }
98 }
99}
100
101impl StaticSource {
102 /// Initializes the static source, reacting on tasks config changes.
103 pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
104 Self { tasks }
105 }
106 /// Returns current list of tasks
107 pub fn tasks_to_schedule(&self) -> TaskTemplates {
108 self.tasks.parsed_contents.read().clone()
109 }
110}