Fix remote extension syncing (#42918)

Julia Ryan and Conrad Irwin created

Closes #40906
Closes #39729

SFTP uploads weren't quoting the install directory which was causing
extension syncing to fail. We were also only running `install_extension`
once per remote-connection instead of once per project (thx @feeiyu for
pointing this out) so extension weren't being loaded in subsequently
opened remote projects.

Release Notes:

- N/A

---------

Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>

Change summary

crates/extension_host/src/extension_host.rs | 56 +++++++++++++---------
crates/extension_host/src/headless_host.rs  |  3 
crates/remote/src/transport/ssh.rs          |  2 
3 files changed, 35 insertions(+), 26 deletions(-)

Detailed changes

crates/extension_host/src/extension_host.rs 🔗

@@ -11,7 +11,7 @@ use async_compression::futures::bufread::GzipDecoder;
 use async_tar::Archive;
 use client::ExtensionProvides;
 use client::{Client, ExtensionMetadata, GetExtensionsResponse, proto, telemetry::Telemetry};
-use collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map};
+use collections::{BTreeMap, BTreeSet, HashSet, btree_map};
 pub use extension::ExtensionManifest;
 use extension::extension_builder::{CompileExtensionOptions, ExtensionBuilder};
 use extension::{
@@ -43,7 +43,7 @@ use language::{
 use node_runtime::NodeRuntime;
 use project::ContextProviderWithTasks;
 use release_channel::ReleaseChannel;
-use remote::{RemoteClient, RemoteConnectionOptions};
+use remote::RemoteClient;
 use semantic_version::SemanticVersion;
 use serde::{Deserialize, Serialize};
 use settings::Settings;
@@ -123,7 +123,7 @@ pub struct ExtensionStore {
     pub wasm_host: Arc<WasmHost>,
     pub wasm_extensions: Vec<(Arc<ExtensionManifest>, WasmExtension)>,
     pub tasks: Vec<Task<()>>,
-    pub remote_clients: HashMap<RemoteConnectionOptions, WeakEntity<RemoteClient>>,
+    pub remote_clients: Vec<WeakEntity<RemoteClient>>,
     pub ssh_registered_tx: UnboundedSender<()>,
 }
 
@@ -274,7 +274,7 @@ impl ExtensionStore {
             reload_tx,
             tasks: Vec::new(),
 
-            remote_clients: HashMap::default(),
+            remote_clients: Default::default(),
             ssh_registered_tx: connection_registered_tx,
         };
 
@@ -348,7 +348,7 @@ impl ExtensionStore {
                                 index_changed = false;
                             }
 
-                            Self::update_ssh_clients(&this, cx).await?;
+                            Self::update_remote_clients(&this, cx).await?;
                         }
                         _ = connection_registered_rx.next() => {
                             debounce_timer = cx
@@ -1725,7 +1725,7 @@ impl ExtensionStore {
         })
     }
 
-    async fn sync_extensions_over_ssh(
+    async fn sync_extensions_to_remotes(
         this: &WeakEntity<Self>,
         client: WeakEntity<RemoteClient>,
         cx: &mut AsyncApp,
@@ -1778,7 +1778,11 @@ impl ExtensionStore {
                     })?,
                 path_style,
             );
-            log::info!("Uploading extension {}", missing_extension.clone().id);
+            log::info!(
+                "Uploading extension {} to {:?}",
+                missing_extension.clone().id,
+                dest_dir
+            );
 
             client
                 .update(cx, |client, cx| {
@@ -1791,27 +1795,35 @@ impl ExtensionStore {
                 missing_extension.clone().id
             );
 
-            client
+            let result = client
                 .update(cx, |client, _cx| {
                     client.proto_client().request(proto::InstallExtension {
                         tmp_dir: dest_dir.to_proto(),
-                        extension: Some(missing_extension),
+                        extension: Some(missing_extension.clone()),
                     })
                 })?
-                .await?;
+                .await;
+
+            if let Err(e) = result {
+                log::error!(
+                    "Failed to install extension {}: {}",
+                    missing_extension.id,
+                    e
+                );
+            }
         }
 
         anyhow::Ok(())
     }
 
-    pub async fn update_ssh_clients(this: &WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
+    pub async fn update_remote_clients(this: &WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
         let clients = this.update(cx, |this, _cx| {
-            this.remote_clients.retain(|_k, v| v.upgrade().is_some());
-            this.remote_clients.values().cloned().collect::<Vec<_>>()
+            this.remote_clients.retain(|v| v.upgrade().is_some());
+            this.remote_clients.clone()
         })?;
 
         for client in clients {
-            Self::sync_extensions_over_ssh(this, client, cx)
+            Self::sync_extensions_to_remotes(this, client, cx)
                 .await
                 .log_err();
         }
@@ -1819,16 +1831,12 @@ impl ExtensionStore {
         anyhow::Ok(())
     }
 
-    pub fn register_remote_client(&mut self, client: Entity<RemoteClient>, cx: &mut Context<Self>) {
-        let options = client.read(cx).connection_options();
-
-        if let Some(existing_client) = self.remote_clients.get(&options)
-            && existing_client.upgrade().is_some()
-        {
-            return;
-        }
-
-        self.remote_clients.insert(options, client.downgrade());
+    pub fn register_remote_client(
+        &mut self,
+        client: Entity<RemoteClient>,
+        _cx: &mut Context<Self>,
+    ) {
+        self.remote_clients.push(client.downgrade());
         self.ssh_registered_tx.unbounded_send(()).ok();
     }
 }

crates/extension_host/src/headless_host.rs 🔗

@@ -279,7 +279,8 @@ impl HeadlessExtensionStore {
             }
 
             fs.rename(&tmp_path, &path, RenameOptions::default())
-                .await?;
+                .await
+                .context("Failed to rename {tmp_path:?} to {path:?}")?;
 
             Self::load_extension(this, extension, cx).await
         })

crates/remote/src/transport/ssh.rs 🔗

@@ -304,7 +304,7 @@ impl RemoteConnection for SshRemoteConnection {
                 let mut child = sftp_command.spawn()?;
                 if let Some(mut stdin) = child.stdin.take() {
                     use futures::AsyncWriteExt;
-                    let sftp_batch = format!("put -r {src_path_display} {dest_path_str}\n");
+                    let sftp_batch = format!("put -r \"{src_path_display}\" \"{dest_path_str}\"\n");
                     stdin.write_all(sftp_batch.as_bytes()).await?;
                     stdin.flush().await?;
                 }