remote: Fix when .ssh config is missing (#47310)

Matthew and Gaff created

Closes #47309

Rewroked the file parsing loop so that if one file is missing it doesn't
cause the loop to exit prematurely.

Release Notes:
* Fixed issue where user .ssh/config file would not parse if global ssh
config file was not present.

---------

Co-authored-by: Gaff <226665+Gaff@users.noreply.github.com>

Change summary

crates/recent_projects/src/remote_servers.rs | 108 +++++++++++----------
1 file changed, 57 insertions(+), 51 deletions(-)

Detailed changes

crates/recent_projects/src/remote_servers.rs 🔗

@@ -8,14 +8,14 @@ use crate::{
 use dev_container::start_dev_container;
 use editor::Editor;
 use file_finder::OpenPathDelegate;
-use futures::{FutureExt, channel::oneshot, future::Shared, select};
+use futures::{FutureExt, channel::oneshot, future::Shared};
 use gpui::{
     AnyElement, App, ClickEvent, ClipboardItem, Context, DismissEvent, Entity, EventEmitter,
     FocusHandle, Focusable, PromptLevel, ScrollHandle, Subscription, Task, WeakEntity, Window,
     canvas,
 };
 use language::Point;
-use log::info;
+use log::{debug, info};
 use paths::{global_ssh_config_file, user_ssh_config_file};
 use picker::Picker;
 use project::{Fs, Project};
@@ -2679,59 +2679,65 @@ impl RemoteServerProjects {
 }
 
 fn spawn_ssh_config_watch(fs: Arc<dyn Fs>, cx: &Context<RemoteServerProjects>) -> Task<()> {
-    let (mut user_ssh_config_watcher, user_watcher_task) =
-        watch_config_file(cx.background_executor(), fs.clone(), user_ssh_config_file());
-    let (mut global_ssh_config_watcher, global_watcher_task) = global_ssh_config_file()
-        .map(|it| watch_config_file(cx.background_executor(), fs, it.to_owned()))
-        .unwrap_or_else(|| (futures::channel::mpsc::unbounded().1, gpui::Task::ready(())));
+    enum ConfigSource {
+        User(String),
+        Global(String),
+    }
+
+    let mut streams = Vec::new();
+    let mut tasks = Vec::new();
+
+    // Setup User Watcher
+    let user_path = user_ssh_config_file();
+    info!("SSH: Watching User Config at: {:?}", user_path);
+
+    // We clone 'fs' here because we might need it again for the global watcher.
+    let (user_s, user_t) = watch_config_file(cx.background_executor(), fs.clone(), user_path);
+    streams.push(user_s.map(ConfigSource::User).boxed());
+    tasks.push(user_t);
+
+    // Setup Global Watcher
+    if let Some(gp) = global_ssh_config_file() {
+        info!("SSH: Watching Global Config at: {:?}", gp);
+        let (global_s, global_t) =
+            watch_config_file(cx.background_executor(), fs, gp.to_path_buf());
+        streams.push(global_s.map(ConfigSource::Global).boxed());
+        tasks.push(global_t);
+    } else {
+        debug!("SSH: No Global Config defined.");
+    }
+
+    // Combine into a single stream so that only one is parsed at once.
+    let mut merged_stream = futures::stream::select_all(streams);
 
     cx.spawn(async move |remote_server_projects, cx| {
-        let _user_watcher_task = user_watcher_task;
-        let _global_watcher_task = global_watcher_task;
+        let _tasks = tasks; // Keeps the background watchers alive
         let mut global_hosts = BTreeSet::default();
         let mut user_hosts = BTreeSet::default();
-        let mut running_receivers = 2;
-
-        loop {
-            select! {
-                new_global_file_contents = global_ssh_config_watcher.next().fuse() => {
-                    match new_global_file_contents {
-                        Some(new_global_file_contents) => {
-                            global_hosts = parse_ssh_config_hosts(&new_global_file_contents);
-                            if remote_server_projects.update(cx, |remote_server_projects, cx| {
-                                remote_server_projects.ssh_config_servers = global_hosts.iter().chain(user_hosts.iter()).map(SharedString::from).collect();
-                                cx.notify();
-                            }).is_err() {
-                                return;
-                            }
-                        },
-                        None => {
-                            running_receivers -= 1;
-                            if running_receivers == 0 {
-                                return;
-                            }
-                        }
-                    }
-                },
-                new_user_file_contents = user_ssh_config_watcher.next().fuse() => {
-                    match new_user_file_contents {
-                        Some(new_user_file_contents) => {
-                            user_hosts = parse_ssh_config_hosts(&new_user_file_contents);
-                            if remote_server_projects.update(cx, |remote_server_projects, cx| {
-                                remote_server_projects.ssh_config_servers = global_hosts.iter().chain(user_hosts.iter()).map(SharedString::from).collect();
-                                cx.notify();
-                            }).is_err() {
-                                return;
-                            }
-                        },
-                        None => {
-                            running_receivers -= 1;
-                            if running_receivers == 0 {
-                                return;
-                            }
-                        }
-                    }
-                },
+
+        while let Some(event) = merged_stream.next().await {
+            match event {
+                ConfigSource::Global(content) => {
+                    global_hosts = parse_ssh_config_hosts(&content);
+                }
+                ConfigSource::User(content) => {
+                    user_hosts = parse_ssh_config_hosts(&content);
+                }
+            }
+
+            // Sync to Model
+            if remote_server_projects
+                .update(cx, |project, cx| {
+                    project.ssh_config_servers = global_hosts
+                        .iter()
+                        .chain(user_hosts.iter())
+                        .map(SharedString::from)
+                        .collect();
+                    cx.notify();
+                })
+                .is_err()
+            {
+                return;
             }
         }
     })