1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
2
3use std::sync::Arc;
4
5use futures::{StreamExt, channel::mpsc::UnboundedSender};
6use gpui::{App, AppContext};
7use parking_lot::RwLock;
8use serde::Deserialize;
9use util::ResultExt;
10
11use crate::TaskTemplates;
12use futures::channel::mpsc::UnboundedReceiver;
13
14/// The source of tasks defined in a tasks config file.
15pub struct StaticSource {
16 tasks: TrackedFile<TaskTemplates>,
17}
18
19/// A Wrapper around deserializable T that keeps track of its contents
20/// via a provided channel.
21pub struct TrackedFile<T> {
22 parsed_contents: Arc<RwLock<T>>,
23}
24
25impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
26 /// Initializes new [`TrackedFile`] with a type that's deserializable.
27 pub fn new(
28 mut tracker: UnboundedReceiver<String>,
29 notification_outlet: UnboundedSender<()>,
30 cx: &App,
31 ) -> Self
32 where
33 T: for<'a> Deserialize<'a> + Default + Send,
34 {
35 let parsed_contents: Arc<RwLock<T>> = Arc::default();
36 cx.background_spawn({
37 let parsed_contents = parsed_contents.clone();
38 async move {
39 while let Some(new_contents) = tracker.next().await {
40 if Arc::strong_count(&parsed_contents) == 1 {
41 // We're no longer being observed. Stop polling.
42 break;
43 }
44 if !new_contents.trim().is_empty() {
45 let Some(new_contents) =
46 serde_json_lenient::from_str::<T>(&new_contents).log_err()
47 else {
48 continue;
49 };
50 let mut contents = parsed_contents.write();
51 if *contents != new_contents {
52 *contents = new_contents;
53 if notification_outlet.unbounded_send(()).is_err() {
54 // Whoever cared about contents is not around anymore.
55 break;
56 }
57 }
58 }
59 }
60 anyhow::Ok(())
61 }
62 })
63 .detach_and_log_err(cx);
64 Self { parsed_contents }
65 }
66
67 /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
68 pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
69 mut tracker: UnboundedReceiver<String>,
70 notification_outlet: UnboundedSender<()>,
71 cx: &App,
72 ) -> Self
73 where
74 T: Default + Send,
75 {
76 let parsed_contents: Arc<RwLock<T>> = Arc::default();
77 cx.background_spawn({
78 async move {
79 while let Some(new_contents) = tracker.next().await {
80 if Arc::strong_count(&parsed_contents) == 1 {
81 // We're no longer being observed. Stop polling.
82 break;
83 }
84
85 if !new_contents.trim().is_empty() {
86 let Some(new_contents) =
87 serde_json_lenient::from_str::<U>(&new_contents).log_err()
88 else {
89 continue;
90 };
91 let Some(new_contents) = new_contents.try_into().log_err() else {
92 continue;
93 };
94 let mut contents = parsed_contents.write();
95 if *contents != new_contents {
96 *contents = new_contents;
97 if notification_outlet.unbounded_send(()).is_err() {
98 // Whoever cared about contents is not around anymore.
99 break;
100 }
101 }
102 }
103 }
104 anyhow::Ok(())
105 }
106 })
107 .detach_and_log_err(cx);
108 Self {
109 parsed_contents: Default::default(),
110 }
111 }
112}
113
114impl StaticSource {
115 /// Initializes the static source, reacting on tasks config changes.
116 pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
117 Self { tasks }
118 }
119 /// Returns current list of tasks
120 pub fn tasks_to_schedule(&self) -> TaskTemplates {
121 self.tasks.parsed_contents.read().clone()
122 }
123}