1//! A source of tasks, based on a static configuration, deserialized from the tasks config file, and related infrastructure for tracking changes to the file.
2
3use std::sync::Arc;
4
5use futures::{StreamExt, channel::mpsc::UnboundedSender};
6use gpui::{App, AppContext};
7use parking_lot::RwLock;
8use serde::Deserialize;
9use util::ResultExt;
10
11use crate::TaskTemplates;
12use futures::channel::mpsc::UnboundedReceiver;
13
14/// The source of tasks defined in a tasks config file.
15pub struct StaticSource {
16 tasks: TrackedFile<TaskTemplates>,
17}
18
19/// A Wrapper around deserializable T that keeps track of its contents
20/// via a provided channel.
21pub struct TrackedFile<T> {
22 parsed_contents: Arc<RwLock<T>>,
23}
24
25impl<T: PartialEq + 'static + Sync> TrackedFile<T> {
26 /// Initializes new [`TrackedFile`] with a type that's deserializable.
27 pub fn new(
28 mut tracker: UnboundedReceiver<String>,
29 notification_outlet: UnboundedSender<()>,
30 cx: &App,
31 ) -> Self
32 where
33 T: for<'a> Deserialize<'a> + Default + Send,
34 {
35 let parsed_contents: Arc<RwLock<T>> = Arc::default();
36 cx.background_spawn({
37 let parsed_contents = parsed_contents.clone();
38 async move {
39 while let Some(new_contents) = tracker.next().await {
40 if Arc::strong_count(&parsed_contents) == 1 {
41 // We're no longer being observed. Stop polling.
42 break;
43 }
44 if !new_contents.trim().is_empty() {
45 let Some(new_contents) =
46 serde_json_lenient::from_str::<T>(&new_contents).log_err()
47 else {
48 continue;
49 };
50 let mut contents = parsed_contents.write();
51 if *contents != new_contents {
52 *contents = new_contents;
53 if notification_outlet.unbounded_send(()).is_err() {
54 // Whoever cared about contents is not around anymore.
55 break;
56 }
57 }
58 }
59 }
60 anyhow::Ok(())
61 }
62 })
63 .detach_and_log_err(cx);
64 Self { parsed_contents }
65 }
66
67 /// Initializes new [`TrackedFile`] with a type that's convertible from another deserializable type.
68 pub fn new_convertible<U: for<'a> Deserialize<'a> + TryInto<T, Error = anyhow::Error>>(
69 mut tracker: UnboundedReceiver<String>,
70 notification_outlet: UnboundedSender<()>,
71 cx: &App,
72 ) -> Self
73 where
74 T: Default + Send,
75 {
76 let parsed_contents: Arc<RwLock<T>> = Arc::default();
77 cx.background_spawn({
78 let parsed_contents = parsed_contents.clone();
79 async move {
80 while let Some(new_contents) = tracker.next().await {
81 if Arc::strong_count(&parsed_contents) == 1 {
82 // We're no longer being observed. Stop polling.
83 break;
84 }
85
86 if !new_contents.trim().is_empty() {
87 let Some(new_contents) =
88 serde_json_lenient::from_str::<U>(&new_contents).log_err()
89 else {
90 continue;
91 };
92 let Some(new_contents) = new_contents.try_into().log_err() else {
93 continue;
94 };
95 let mut contents = parsed_contents.write();
96 if *contents != new_contents {
97 *contents = new_contents;
98 if notification_outlet.unbounded_send(()).is_err() {
99 // Whoever cared about contents is not around anymore.
100 break;
101 }
102 }
103 }
104 }
105 anyhow::Ok(())
106 }
107 })
108 .detach_and_log_err(cx);
109 Self {
110 parsed_contents: Default::default(),
111 }
112 }
113}
114
115impl StaticSource {
116 /// Initializes the static source, reacting on tasks config changes.
117 pub fn new(tasks: TrackedFile<TaskTemplates>) -> Self {
118 Self { tasks }
119 }
120 /// Returns current list of tasks
121 pub fn tasks_to_schedule(&self) -> TaskTemplates {
122 self.tasks.parsed_contents.read().clone()
123 }
124}