1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
2
3use std::sync::Arc;
4
5use futures::{channel::mpsc::UnboundedSender, StreamExt};
6use gpui::App;
7use parking_lot::RwLock;
8use serde::Deserialize;
9use util::ResultExt;
10
11use crate::TaskTemplates;
12use futures::channel::mpsc::UnboundedReceiver;
13
14/// The source of tasks defined in a tasks config file.
15pub struct StaticSource {
16 tasks: TrackedFile<TaskTemplates>,
17}
18
19/// A Wrapper around deserializable T that keeps track of its contents
20/// via a provided channel.
21pub struct TrackedFile<T> {
22 parsed_contents: Arc<RwLock<T>>,
23}
24
25impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
26 /// Initializes new [`TrackedFile`] with a type that's deserializable.
27 pub fn new(
28 mut tracker: UnboundedReceiver<String>,
29 notification_outlet: UnboundedSender<()>,
30 cx: &App,
31 ) -> Self
32 where
33 T: for<'a> Deserialize<'a> + Default + Send,
34 {
35 let parsed_contents: Arc<RwLock<T>> = Arc::default();
36 cx.background_executor()
37 .spawn({
38 let parsed_contents = parsed_contents.clone();
39 async move {
40 while let Some(new_contents) = tracker.next().await {
41 if Arc::strong_count(&parsed_contents) == 1 {
42 // We're no longer being observed. Stop polling.
43 break;
44 }
45 if !new_contents.trim().is_empty() {
46 let Some(new_contents) =
47 serde_json_lenient::from_str::<T>(&new_contents).log_err()
48 else {
49 continue;
50 };
51 let mut contents = parsed_contents.write();
52 if *contents != new_contents {
53 *contents = new_contents;
54 if notification_outlet.unbounded_send(()).is_err() {
55 // Whoever cared about contents is not around anymore.
56 break;
57 }
58 }
59 }
60 }
61 anyhow::Ok(())
62 }
63 })
64 .detach_and_log_err(cx);
65 Self { parsed_contents }
66 }
67
68 /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
69 pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
70 mut tracker: UnboundedReceiver<String>,
71 notification_outlet: UnboundedSender<()>,
72 cx: &App,
73 ) -> Self
74 where
75 T: Default + Send,
76 {
77 let parsed_contents: Arc<RwLock<T>> = Arc::default();
78 cx.background_executor()
79 .spawn({
80 let parsed_contents = parsed_contents.clone();
81 async move {
82 while let Some(new_contents) = tracker.next().await {
83 if Arc::strong_count(&parsed_contents) == 1 {
84 // We're no longer being observed. Stop polling.
85 break;
86 }
87
88 if !new_contents.trim().is_empty() {
89 let Some(new_contents) =
90 serde_json_lenient::from_str::<U>(&new_contents).log_err()
91 else {
92 continue;
93 };
94 let Some(new_contents) = new_contents.try_into().log_err() else {
95 continue;
96 };
97 let mut contents = parsed_contents.write();
98 if *contents != new_contents {
99 *contents = new_contents;
100 if notification_outlet.unbounded_send(()).is_err() {
101 // Whoever cared about contents is not around anymore.
102 break;
103 }
104 }
105 }
106 }
107 anyhow::Ok(())
108 }
109 })
110 .detach_and_log_err(cx);
111 Self {
112 parsed_contents: Default::default(),
113 }
114 }
115}
116
117impl StaticSource {
118 /// Initializes the static source, reacting on tasks config changes.
119 pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
120 Self { tasks }
121 }
122 /// Returns current list of tasks
123 pub fn tasks_to_schedule(&self) -> TaskTemplates {
124 self.tasks.parsed_contents.read().clone()
125 }
126}