Fix config file watch task leak (#47246)

Ben Kunkle created

Follow-Up-For:  #47243

Previously, we would detach tasks spawned to watch config files.
However, the task blocked on receiving a file event before checking if
the receiver for the updates channel was dropped, causing the task to
never exit. The fix here was to return the task explicitly, so that it
can be dropped instead of calling `.detach()` on it. There is definitely
a way to `select!` between the receiver being dropped and the next file
system event, but I couldn't figure it out in a reasonable amount of
time and decided it wasn't worth it.

Release Notes:

- Fixed an issue where a few file descriptors would be leaked each time
a project was closed

Change summary

crates/agent/src/tests/mod.rs                |  3 +
crates/project/src/project_settings.rs       |  6 ++-
crates/recent_projects/src/remote_servers.rs |  8 ++-
crates/remote_server/src/unix.rs             |  6 ++-
crates/settings/src/editorconfig_store.rs    |  4 +
crates/settings/src/settings_file.rs         | 40 ++++++++++-----------
crates/zed/src/main.rs                       | 16 ++++++--
crates/zed/src/zed.rs                        | 38 +++++++++++++++-----
8 files changed, 76 insertions(+), 45 deletions(-)

Detailed changes

crates/agent/src/tests/mod.rs 🔗

@@ -3352,11 +3352,12 @@ fn watch_settings(fs: Arc<dyn Fs>, cx: &mut App) {
     let fs = fs.clone();
     cx.spawn({
         async move |cx| {
-            let mut new_settings_content_rx = settings::watch_config_file(
+            let (mut new_settings_content_rx, watcher_task) = settings::watch_config_file(
                 cx.background_executor(),
                 fs,
                 paths::settings_file().clone(),
             );
+            let _watcher_task = watcher_task;
 
             while let Some(new_settings_content) = new_settings_content_rx.next().await {
                 cx.update(|cx| {

crates/project/src/project_settings.rs 🔗

@@ -1317,11 +1317,12 @@ impl SettingsObserver {
         file_path: PathBuf,
         cx: &mut Context<Self>,
     ) -> Task<()> {
-        let mut user_tasks_file_rx =
+        let (mut user_tasks_file_rx, watcher_task) =
             watch_config_file(cx.background_executor(), fs, file_path.clone());
         let user_tasks_content = cx.foreground_executor().block_on(user_tasks_file_rx.next());
         let weak_entry = cx.weak_entity();
         cx.spawn(async move |settings_observer, cx| {
+            let _watcher_task = watcher_task;
             let Ok(task_store) = settings_observer.read_with(cx, |settings_observer, _| {
                 settings_observer.task_store.clone()
             }) else {
@@ -1368,11 +1369,12 @@ impl SettingsObserver {
         file_path: PathBuf,
         cx: &mut Context<Self>,
     ) -> Task<()> {
-        let mut user_tasks_file_rx =
+        let (mut user_tasks_file_rx, watcher_task) =
             watch_config_file(cx.background_executor(), fs, file_path.clone());
         let user_tasks_content = cx.foreground_executor().block_on(user_tasks_file_rx.next());
         let weak_entry = cx.weak_entity();
         cx.spawn(async move |settings_observer, cx| {
+            let _watcher_task = watcher_task;
             let Ok(task_store) = settings_observer.read_with(cx, |settings_observer, _| {
                 settings_observer.task_store.clone()
             }) else {

crates/recent_projects/src/remote_servers.rs 🔗

@@ -2679,13 +2679,15 @@ impl RemoteServerProjects {
 }
 
 fn spawn_ssh_config_watch(fs: Arc<dyn Fs>, cx: &Context<RemoteServerProjects>) -> Task<()> {
-    let mut user_ssh_config_watcher =
+    let (mut user_ssh_config_watcher, user_watcher_task) =
         watch_config_file(cx.background_executor(), fs.clone(), user_ssh_config_file());
-    let mut global_ssh_config_watcher = global_ssh_config_file()
+    let (mut global_ssh_config_watcher, global_watcher_task) = global_ssh_config_file()
         .map(|it| watch_config_file(cx.background_executor(), fs, it.to_owned()))
-        .unwrap_or_else(|| futures::channel::mpsc::unbounded().1);
+        .unwrap_or_else(|| (futures::channel::mpsc::unbounded().1, gpui::Task::ready(())));
 
     cx.spawn(async move |remote_server_projects, cx| {
+        let _user_watcher_task = user_watcher_task;
+        let _global_watcher_task = global_watcher_task;
         let mut global_hosts = BTreeSet::default();
         let mut user_hosts = BTreeSet::default();
         let mut running_receivers = 2;

crates/remote_server/src/unix.rs 🔗

@@ -941,10 +941,10 @@ fn initialize_settings(
     fs: Arc<dyn Fs>,
     cx: &mut App,
 ) -> watch::Receiver<Option<NodeBinaryOptions>> {
-    let user_settings_file_rx =
+    let (user_settings_file_rx, watcher_task) =
         watch_config_file(cx.background_executor(), fs, paths::settings_file().clone());
 
-    handle_settings_file_changes(user_settings_file_rx, cx, {
+    handle_settings_file_changes(user_settings_file_rx, watcher_task, cx, {
         move |err, _cx| {
             if let Some(e) = err {
                 log::info!("Server settings failed to change: {}", e);
@@ -1007,6 +1007,7 @@ fn initialize_settings(
 
 pub fn handle_settings_file_changes(
     mut server_settings_file: mpsc::UnboundedReceiver<String>,
+    watcher_task: gpui::Task<()>,
     cx: &mut App,
     settings_changed: impl Fn(Option<anyhow::Error>, &mut App) + 'static,
 ) {
@@ -1020,6 +1021,7 @@ pub fn handle_settings_file_changes(
             .log_err();
     });
     cx.spawn(async move |cx| {
+        let _watcher_task = watcher_task;
         while let Some(server_settings_content) = server_settings_file.next().await {
             cx.update_global(|store: &mut SettingsStore, cx| {
                 let result = store.set_server_settings(&server_settings_content, cx);

crates/settings/src/editorconfig_store.rs 🔗

@@ -285,9 +285,11 @@ impl EditorconfigStore {
         cx: &mut Context<Self>,
     ) -> Task<()> {
         let config_path = dir_path.join(EDITORCONFIG_NAME);
-        let mut config_rx = watch_config_file(cx.background_executor(), fs, config_path);
+        let (mut config_rx, watcher_task) =
+            watch_config_file(cx.background_executor(), fs, config_path);
 
         cx.spawn(async move |this, cx| {
+            let _watcher_task = watcher_task;
             while let Some(content) = config_rx.next().await {
                 let content = Some(content).filter(|c| !c.is_empty());
                 let dir_path = dir_path.clone();

crates/settings/src/settings_file.rs 🔗

@@ -76,32 +76,30 @@ pub fn watch_config_file(
     executor: &BackgroundExecutor,
     fs: Arc<dyn Fs>,
     path: PathBuf,
-) -> mpsc::UnboundedReceiver<String> {
+) -> (mpsc::UnboundedReceiver<String>, gpui::Task<()>) {
     let (tx, rx) = mpsc::unbounded();
-    executor
-        .spawn(async move {
-            let (events, _) = fs.watch(&path, Duration::from_millis(100)).await;
-            futures::pin_mut!(events);
+    let task = executor.spawn(async move {
+        let (events, _) = fs.watch(&path, Duration::from_millis(100)).await;
+        futures::pin_mut!(events);
 
-            let contents = fs.load(&path).await.unwrap_or_default();
-            if tx.unbounded_send(contents).is_err() {
-                return;
-            }
+        let contents = fs.load(&path).await.unwrap_or_default();
+        if tx.unbounded_send(contents).is_err() {
+            return;
+        }
 
-            loop {
-                if events.next().await.is_none() {
-                    break;
-                }
+        loop {
+            if events.next().await.is_none() {
+                break;
+            }
 
-                if let Ok(contents) = fs.load(&path).await
-                    && tx.unbounded_send(contents).is_err()
-                {
-                    break;
-                }
+            if let Ok(contents) = fs.load(&path).await
+                && tx.unbounded_send(contents).is_err()
+            {
+                break;
             }
-        })
-        .detach();
-    rx
+        }
+    });
+    (rx, task)
 }
 
 pub fn watch_config_dir(

crates/zed/src/main.rs 🔗

@@ -362,17 +362,17 @@ fn main() {
     }
 
     let fs = Arc::new(RealFs::new(git_binary_path, app.background_executor()));
-    let user_settings_file_rx = watch_config_file(
+    let (user_settings_file_rx, user_settings_watcher) = watch_config_file(
         &app.background_executor(),
         fs.clone(),
         paths::settings_file().clone(),
     );
-    let global_settings_file_rx = watch_config_file(
+    let (global_settings_file_rx, global_settings_watcher) = watch_config_file(
         &app.background_executor(),
         fs.clone(),
         paths::global_settings_file().clone(),
     );
-    let user_keymap_file_rx = watch_config_file(
+    let (user_keymap_file_rx, user_keymap_watcher) = watch_config_file(
         &app.background_executor(),
         fs.clone(),
         paths::keymap_file().clone(),
@@ -435,8 +435,14 @@ fn main() {
         }
         settings::init(cx);
         zlog_settings::init(cx);
-        handle_settings_file_changes(user_settings_file_rx, global_settings_file_rx, cx);
-        handle_keymap_file_changes(user_keymap_file_rx, cx);
+        handle_settings_file_changes(
+            user_settings_file_rx,
+            user_settings_watcher,
+            global_settings_file_rx,
+            global_settings_watcher,
+            cx,
+        );
+        handle_keymap_file_changes(user_keymap_file_rx, user_keymap_watcher, cx);
 
         let user_agent = format!(
             "Zed/{} ({}; {})",

crates/zed/src/zed.rs 🔗

@@ -1542,7 +1542,9 @@ fn notify_settings_errors(result: settings::SettingsParseResult, is_user: bool,
 
 pub fn handle_settings_file_changes(
     mut user_settings_file_rx: mpsc::UnboundedReceiver<String>,
+    user_settings_watcher: gpui::Task<()>,
     mut global_settings_file_rx: mpsc::UnboundedReceiver<String>,
+    global_settings_watcher: gpui::Task<()>,
     cx: &mut App,
 ) {
     MigrationNotification::set_global(cx.new(|_| MigrationNotification), cx);
@@ -1564,6 +1566,8 @@ pub fn handle_settings_file_changes(
 
     // Watch for changes in both files
     cx.spawn(async move |cx| {
+        let _user_settings_watcher = user_settings_watcher;
+        let _global_settings_watcher = global_settings_watcher;
         let mut settings_streams = futures::stream::select(
             global_settings_file_rx.map(Either::Left),
             user_settings_file_rx.map(Either::Right),
@@ -1601,6 +1605,7 @@ pub fn handle_settings_file_changes(
 
 pub fn handle_keymap_file_changes(
     mut user_keymap_file_rx: mpsc::UnboundedReceiver<String>,
+    user_keymap_watcher: gpui::Task<()>,
     cx: &mut App,
 ) {
     let (base_keymap_tx, mut base_keymap_rx) = mpsc::unbounded();
@@ -1659,6 +1664,7 @@ pub fn handle_keymap_file_changes(
     let notification_id = NotificationId::unique::<KeymapParseErrorNotification>();
 
     cx.spawn(async move |cx| {
+        let _user_keymap_watcher = user_keymap_watcher;
         let mut user_keymap_content = String::new();
         let mut migrating_in_memory = false;
         loop {
@@ -4509,23 +4515,29 @@ mod tests {
             .unwrap();
         executor.run_until_parked();
         cx.update(|cx| {
-            let settings_rx = watch_config_file(
+            let (settings_rx, settings_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/settings.json"),
             );
-            let keymap_rx = watch_config_file(
+            let (keymap_rx, keymap_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/keymap.json"),
             );
-            let global_settings_rx = watch_config_file(
+            let (global_settings_rx, global_settings_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/global_settings.json"),
             );
-            handle_settings_file_changes(settings_rx, global_settings_rx, cx);
-            handle_keymap_file_changes(keymap_rx, cx);
+            handle_settings_file_changes(
+                settings_rx,
+                settings_watcher,
+                global_settings_rx,
+                global_settings_watcher,
+                cx,
+            );
+            handle_keymap_file_changes(keymap_rx, keymap_watcher, cx);
         });
         workspace
             .update(cx, |workspace, _, cx| {
@@ -4626,24 +4638,30 @@ mod tests {
             .unwrap();
 
         cx.update(|cx| {
-            let settings_rx = watch_config_file(
+            let (settings_rx, settings_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/settings.json"),
             );
-            let keymap_rx = watch_config_file(
+            let (keymap_rx, keymap_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/keymap.json"),
             );
 
-            let global_settings_rx = watch_config_file(
+            let (global_settings_rx, global_settings_watcher) = watch_config_file(
                 &executor,
                 app_state.fs.clone(),
                 PathBuf::from("/global_settings.json"),
             );
-            handle_settings_file_changes(settings_rx, global_settings_rx, cx);
-            handle_keymap_file_changes(keymap_rx, cx);
+            handle_settings_file_changes(
+                settings_rx,
+                settings_watcher,
+                global_settings_rx,
+                global_settings_watcher,
+                cx,
+            );
+            handle_keymap_file_changes(keymap_rx, keymap_watcher, cx);
         });
 
         cx.background_executor.run_until_parked();