Add extensions to the remote server (#20049)

Mikayla Maki and Conrad Irwin created

TODO:

- [x] Double check strange PHP env detection
- [x] Clippy & etc.

Release Notes:

- Added support for extension languages on the remote server

---------

Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>

Change summary

Cargo.lock                                    |   4 
crates/extension_host/Cargo.toml              |   2 
crates/extension_host/src/extension_host.rs   | 158 ++++++++
crates/extension_host/src/headless_host.rs    | 379 +++++++++++++++++++++
crates/proto/proto/zed.proto                  |  28 +
crates/proto/src/proto.rs                     |   5 
crates/recent_projects/Cargo.toml             |   1 
crates/recent_projects/src/ssh_connections.rs |  10 
crates/remote/src/ssh_session.rs              |   1 
crates/remote_server/Cargo.toml               |   1 
crates/remote_server/src/headless_project.rs  |  21 +
11 files changed, 606 insertions(+), 4 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -4170,6 +4170,7 @@ dependencies = [
  "paths",
  "project",
  "release_channel",
+ "remote",
  "reqwest_client",
  "schemars",
  "semantic_version",
@@ -4178,6 +4179,7 @@ dependencies = [
  "serde_json_lenient",
  "settings",
  "task",
+ "tempfile",
  "theme",
  "toml 0.8.19",
  "url",
@@ -9677,6 +9679,7 @@ dependencies = [
  "anyhow",
  "auto_update",
  "editor",
+ "extension_host",
  "file_finder",
  "futures 0.3.31",
  "fuzzy",
@@ -9852,6 +9855,7 @@ dependencies = [
  "client",
  "clock",
  "env_logger 0.11.5",
+ "extension_host",
  "fork",
  "fs",
  "futures 0.3.31",

crates/extension_host/Cargo.toml 🔗

@@ -34,6 +34,7 @@ lsp.workspace = true
 node_runtime.workspace = true
 paths.workspace = true
 project.workspace = true
+remote.workspace = true
 release_channel.workspace = true
 schemars.workspace = true
 semantic_version.workspace = true
@@ -42,6 +43,7 @@ serde_json.workspace = true
 serde_json_lenient.workspace = true
 settings.workspace = true
 task.workspace = true
+tempfile.workspace = true
 toml.workspace = true
 url.workspace = true
 util.workspace = true

crates/extension_host/src/extension_host.rs 🔗

@@ -1,5 +1,6 @@
 pub mod extension_lsp_adapter;
 pub mod extension_settings;
+pub mod headless_host;
 pub mod wasm_host;
 
 #[cfg(test)]
@@ -9,8 +10,8 @@ use crate::extension_lsp_adapter::ExtensionLspAdapter;
 use anyhow::{anyhow, bail, Context as _, Result};
 use async_compression::futures::bufread::GzipDecoder;
 use async_tar::Archive;
-use client::{telemetry::Telemetry, Client, ExtensionMetadata, GetExtensionsResponse};
-use collections::{btree_map, BTreeMap, HashSet};
+use client::{proto, telemetry::Telemetry, Client, ExtensionMetadata, GetExtensionsResponse};
+use collections::{btree_map, BTreeMap, HashMap, HashSet};
 use extension::extension_builder::{CompileExtensionOptions, ExtensionBuilder};
 use extension::Extension;
 pub use extension::ExtensionManifest;
@@ -36,6 +37,7 @@ use lsp::LanguageServerName;
 use node_runtime::NodeRuntime;
 use project::ContextProviderWithTasks;
 use release_channel::ReleaseChannel;
+use remote::SshRemoteClient;
 use semantic_version::SemanticVersion;
 use serde::{Deserialize, Serialize};
 use settings::Settings;
@@ -178,6 +180,8 @@ pub struct ExtensionStore {
     pub wasm_host: Arc<WasmHost>,
     pub wasm_extensions: Vec<(Arc<ExtensionManifest>, WasmExtension)>,
     pub tasks: Vec<Task<()>>,
+    pub ssh_clients: HashMap<String, WeakModel<SshRemoteClient>>,
+    pub ssh_registered_tx: UnboundedSender<()>,
 }
 
 #[derive(Clone, Copy)]
@@ -289,6 +293,7 @@ impl ExtensionStore {
         let index_path = extensions_dir.join("index.json");
 
         let (reload_tx, mut reload_rx) = unbounded();
+        let (connection_registered_tx, mut connection_registered_rx) = unbounded();
         let mut this = Self {
             registration_hooks: extension_api.clone(),
             extension_index: Default::default(),
@@ -312,6 +317,9 @@ impl ExtensionStore {
             telemetry,
             reload_tx,
             tasks: Vec::new(),
+
+            ssh_clients: HashMap::default(),
+            ssh_registered_tx: connection_registered_tx,
         };
 
         // The extensions store maintains an index file, which contains a complete
@@ -386,6 +394,14 @@ impl ExtensionStore {
                                     .await;
                                 index_changed = false;
                             }
+
+                            Self::update_ssh_clients(&this, &mut cx).await?;
+                        }
+                        _ = connection_registered_rx.next() => {
+                            debounce_timer = cx
+                                .background_executor()
+                                .timer(RELOAD_DEBOUNCE_DURATION)
+                                .fuse();
                         }
                         extension_id = reload_rx.next() => {
                             let Some(extension_id) = extension_id else { break; };
@@ -1431,6 +1447,144 @@ impl ExtensionStore {
 
         Ok(())
     }
+
+    fn prepare_remote_extension(
+        &mut self,
+        extension_id: Arc<str>,
+        tmp_dir: PathBuf,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let src_dir = self.extensions_dir().join(extension_id.as_ref());
+        let Some(loaded_extension) = self.extension_index.extensions.get(&extension_id).cloned()
+        else {
+            return Task::ready(Err(anyhow!("extension no longer installed")));
+        };
+        let fs = self.fs.clone();
+        cx.background_executor().spawn(async move {
+            for well_known_path in ["extension.toml", "extension.json", "extension.wasm"] {
+                if fs.is_file(&src_dir.join(well_known_path)).await {
+                    fs.copy_file(
+                        &src_dir.join(well_known_path),
+                        &tmp_dir.join(well_known_path),
+                        fs::CopyOptions::default(),
+                    )
+                    .await?
+                }
+            }
+
+            for language_path in loaded_extension.manifest.languages.iter() {
+                if fs
+                    .is_file(&src_dir.join(language_path).join("config.toml"))
+                    .await
+                {
+                    fs.create_dir(&tmp_dir.join(language_path)).await?;
+                    fs.copy_file(
+                        &src_dir.join(language_path).join("config.toml"),
+                        &tmp_dir.join(language_path).join("config.toml"),
+                        fs::CopyOptions::default(),
+                    )
+                    .await?
+                }
+            }
+
+            Ok(())
+        })
+    }
+
+    async fn sync_extensions_over_ssh(
+        this: &WeakModel<Self>,
+        client: WeakModel<SshRemoteClient>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        let extensions = this.update(cx, |this, _cx| {
+            this.extension_index
+                .extensions
+                .iter()
+                .filter_map(|(id, entry)| {
+                    if entry.manifest.language_servers.is_empty() {
+                        return None;
+                    }
+                    Some(proto::Extension {
+                        id: id.to_string(),
+                        version: entry.manifest.version.to_string(),
+                        dev: entry.dev,
+                    })
+                })
+                .collect()
+        })?;
+
+        let response = client
+            .update(cx, |client, _cx| {
+                client
+                    .proto_client()
+                    .request(proto::SyncExtensions { extensions })
+            })?
+            .await?;
+
+        for missing_extension in response.missing_extensions.into_iter() {
+            let tmp_dir = tempfile::tempdir()?;
+            this.update(cx, |this, cx| {
+                this.prepare_remote_extension(
+                    missing_extension.id.clone().into(),
+                    tmp_dir.path().to_owned(),
+                    cx,
+                )
+            })?
+            .await?;
+            let dest_dir = PathBuf::from(&response.tmp_dir).join(missing_extension.clone().id);
+            log::info!("Uploading extension {}", missing_extension.clone().id);
+
+            client
+                .update(cx, |client, cx| {
+                    client.upload_directory(tmp_dir.path().to_owned(), dest_dir.clone(), cx)
+                })?
+                .await?;
+
+            client
+                .update(cx, |client, _cx| {
+                    client.proto_client().request(proto::InstallExtension {
+                        tmp_dir: dest_dir.to_string_lossy().to_string(),
+                        extension: Some(missing_extension),
+                    })
+                })?
+                .await?;
+        }
+
+        anyhow::Ok(())
+    }
+
+    pub async fn update_ssh_clients(
+        this: &WeakModel<Self>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        let clients = this.update(cx, |this, _cx| {
+            this.ssh_clients.retain(|_k, v| v.upgrade().is_some());
+            this.ssh_clients.values().cloned().collect::<Vec<_>>()
+        })?;
+
+        for client in clients {
+            Self::sync_extensions_over_ssh(&this, client, cx)
+                .await
+                .log_err();
+        }
+
+        anyhow::Ok(())
+    }
+
+    pub fn register_ssh_client(
+        &mut self,
+        client: Model<SshRemoteClient>,
+        cx: &mut ModelContext<Self>,
+    ) {
+        let connection_options = client.read(cx).connection_options();
+        if self.ssh_clients.contains_key(&connection_options.ssh_url()) {
+            return;
+        }
+
+        self.ssh_clients
+            .insert(connection_options.ssh_url(), client.downgrade());
+        self.ssh_registered_tx.unbounded_send(()).ok();
+    }
 }
 
 fn load_plugin_queries(root_path: &Path) -> LanguageQueries {

crates/extension_host/src/headless_host.rs 🔗

@@ -0,0 +1,379 @@
+use std::{path::PathBuf, sync::Arc};
+
+use anyhow::{anyhow, Context as _, Result};
+use client::{proto, TypedEnvelope};
+use collections::{HashMap, HashSet};
+use extension::{Extension, ExtensionManifest};
+use fs::{Fs, RemoveOptions, RenameOptions};
+use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext, Task, WeakModel};
+use http_client::HttpClient;
+use language::{LanguageConfig, LanguageName, LanguageQueries, LanguageRegistry, LoadedLanguage};
+use lsp::LanguageServerName;
+use node_runtime::NodeRuntime;
+
+use crate::{
+    extension_lsp_adapter::ExtensionLspAdapter,
+    wasm_host::{WasmExtension, WasmHost},
+    ExtensionRegistrationHooks,
+};
+
+pub struct HeadlessExtensionStore {
+    pub registration_hooks: Arc<dyn ExtensionRegistrationHooks>,
+    pub fs: Arc<dyn Fs>,
+    pub extension_dir: PathBuf,
+    pub wasm_host: Arc<WasmHost>,
+    pub loaded_extensions: HashMap<Arc<str>, Arc<str>>,
+    pub loaded_languages: HashMap<Arc<str>, Vec<LanguageName>>,
+    pub loaded_language_servers: HashMap<Arc<str>, Vec<(LanguageServerName, LanguageName)>>,
+}
+
+#[derive(Clone, Debug)]
+pub struct ExtensionVersion {
+    pub id: String,
+    pub version: String,
+    pub dev: bool,
+}
+
+impl HeadlessExtensionStore {
+    pub fn new(
+        fs: Arc<dyn Fs>,
+        http_client: Arc<dyn HttpClient>,
+        languages: Arc<LanguageRegistry>,
+        extension_dir: PathBuf,
+        node_runtime: NodeRuntime,
+        cx: &mut AppContext,
+    ) -> Model<Self> {
+        let registration_hooks = Arc::new(HeadlessRegistrationHooks::new(languages.clone()));
+        cx.new_model(|cx| Self {
+            registration_hooks: registration_hooks.clone(),
+            fs: fs.clone(),
+            wasm_host: WasmHost::new(
+                fs.clone(),
+                http_client.clone(),
+                node_runtime,
+                registration_hooks,
+                extension_dir.join("work"),
+                cx,
+            ),
+            extension_dir,
+            loaded_extensions: Default::default(),
+            loaded_languages: Default::default(),
+            loaded_language_servers: Default::default(),
+        })
+    }
+
+    pub fn sync_extensions(
+        &mut self,
+        extensions: Vec<ExtensionVersion>,
+        cx: &ModelContext<Self>,
+    ) -> Task<Result<Vec<ExtensionVersion>>> {
+        let on_client = HashSet::from_iter(extensions.iter().map(|e| e.id.as_str()));
+        let to_remove: Vec<Arc<str>> = self
+            .loaded_extensions
+            .keys()
+            .filter(|id| !on_client.contains(id.as_ref()))
+            .cloned()
+            .collect();
+        let to_load: Vec<ExtensionVersion> = extensions
+            .into_iter()
+            .filter(|e| {
+                if e.dev {
+                    return true;
+                }
+                !self
+                    .loaded_extensions
+                    .get(e.id.as_str())
+                    .is_some_and(|loaded| loaded.as_ref() == e.version.as_str())
+            })
+            .collect();
+
+        cx.spawn(|this, mut cx| async move {
+            let mut missing = Vec::new();
+
+            for extension_id in to_remove {
+                log::info!("removing extension: {}", extension_id);
+                this.update(&mut cx, |this, cx| {
+                    this.uninstall_extension(&extension_id, cx)
+                })?
+                .await?;
+            }
+
+            for extension in to_load {
+                if let Err(e) = Self::load_extension(this.clone(), extension.clone(), &mut cx).await
+                {
+                    log::info!("failed to load extension: {}, {:?}", extension.id, e);
+                    missing.push(extension)
+                } else if extension.dev {
+                    missing.push(extension)
+                }
+            }
+
+            Ok(missing)
+        })
+    }
+
+    pub async fn load_extension(
+        this: WeakModel<Self>,
+        extension: ExtensionVersion,
+        cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        let (fs, wasm_host, extension_dir) = this.update(cx, |this, _cx| {
+            this.loaded_extensions.insert(
+                extension.id.clone().into(),
+                extension.version.clone().into(),
+            );
+            (
+                this.fs.clone(),
+                this.wasm_host.clone(),
+                this.extension_dir.join(&extension.id),
+            )
+        })?;
+
+        let manifest = Arc::new(ExtensionManifest::load(fs.clone(), &extension_dir).await?);
+
+        debug_assert!(!manifest.languages.is_empty() || !manifest.language_servers.is_empty());
+
+        if manifest.version.as_ref() != extension.version.as_str() {
+            anyhow::bail!(
+                "mismatched versions: ({}) != ({})",
+                manifest.version,
+                extension.version
+            )
+        }
+
+        for language_path in &manifest.languages {
+            let language_path = extension_dir.join(language_path);
+            let config = fs.load(&language_path.join("config.toml")).await?;
+            let mut config = ::toml::from_str::<LanguageConfig>(&config)?;
+
+            this.update(cx, |this, _cx| {
+                this.loaded_languages
+                    .entry(manifest.id.clone())
+                    .or_default()
+                    .push(config.name.clone());
+
+                config.grammar = None;
+
+                this.registration_hooks.register_language(
+                    config.name.clone(),
+                    None,
+                    config.matcher.clone(),
+                    Arc::new(move || {
+                        Ok(LoadedLanguage {
+                            config: config.clone(),
+                            queries: LanguageQueries::default(),
+                            context_provider: None,
+                            toolchain_provider: None,
+                        })
+                    }),
+                );
+            })?;
+        }
+
+        if manifest.language_servers.is_empty() {
+            return Ok(());
+        }
+
+        let wasm_extension: Arc<dyn Extension> =
+            Arc::new(WasmExtension::load(extension_dir, &manifest, wasm_host.clone(), &cx).await?);
+
+        for (language_server_name, language_server_config) in &manifest.language_servers {
+            for language in language_server_config.languages() {
+                this.update(cx, |this, _cx| {
+                    this.loaded_language_servers
+                        .entry(manifest.id.clone())
+                        .or_default()
+                        .push((language_server_name.clone(), language.clone()));
+                    this.registration_hooks.register_lsp_adapter(
+                        language.clone(),
+                        ExtensionLspAdapter {
+                            extension: wasm_extension.clone(),
+                            language_server_id: language_server_name.clone(),
+                            language_name: language,
+                        },
+                    );
+                })?;
+            }
+        }
+
+        Ok(())
+    }
+
+    fn uninstall_extension(
+        &mut self,
+        extension_id: &Arc<str>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        self.loaded_extensions.remove(extension_id);
+        let languages_to_remove = self
+            .loaded_languages
+            .remove(extension_id)
+            .unwrap_or_default();
+        self.registration_hooks
+            .remove_languages(&languages_to_remove, &[]);
+        for (language_server_name, language) in self
+            .loaded_language_servers
+            .remove(extension_id)
+            .unwrap_or_default()
+        {
+            self.registration_hooks
+                .remove_lsp_adapter(&language, &language_server_name);
+        }
+
+        let path = self.extension_dir.join(&extension_id.to_string());
+        let fs = self.fs.clone();
+        cx.spawn(|_, _| async move {
+            fs.remove_dir(
+                &path,
+                RemoveOptions {
+                    recursive: true,
+                    ignore_if_not_exists: true,
+                },
+            )
+            .await
+        })
+    }
+
+    pub fn install_extension(
+        &mut self,
+        extension: ExtensionVersion,
+        tmp_path: PathBuf,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let path = self.extension_dir.join(&extension.id);
+        let fs = self.fs.clone();
+
+        cx.spawn(|this, mut cx| async move {
+            if fs.is_dir(&path).await {
+                this.update(&mut cx, |this, cx| {
+                    this.uninstall_extension(&extension.id.clone().into(), cx)
+                })?
+                .await?;
+            }
+
+            fs.rename(&tmp_path, &path, RenameOptions::default())
+                .await?;
+
+            Self::load_extension(this, extension, &mut cx).await
+        })
+    }
+
+    pub async fn handle_sync_extensions(
+        extension_store: Model<HeadlessExtensionStore>,
+        envelope: TypedEnvelope<proto::SyncExtensions>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::SyncExtensionsResponse> {
+        let requested_extensions =
+            envelope
+                .payload
+                .extensions
+                .into_iter()
+                .map(|p| ExtensionVersion {
+                    id: p.id,
+                    version: p.version,
+                    dev: p.dev,
+                });
+        let missing_extensions = extension_store
+            .update(&mut cx, |extension_store, cx| {
+                extension_store.sync_extensions(requested_extensions.collect(), cx)
+            })?
+            .await?;
+
+        Ok(proto::SyncExtensionsResponse {
+            missing_extensions: missing_extensions
+                .into_iter()
+                .map(|e| proto::Extension {
+                    id: e.id,
+                    version: e.version,
+                    dev: e.dev,
+                })
+                .collect(),
+            tmp_dir: paths::remote_extensions_uploads_dir()
+                .to_string_lossy()
+                .to_string(),
+        })
+    }
+
+    pub async fn handle_install_extension(
+        extensions: Model<HeadlessExtensionStore>,
+        envelope: TypedEnvelope<proto::InstallExtension>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::Ack> {
+        let extension = envelope
+            .payload
+            .extension
+            .with_context(|| anyhow!("Invalid InstallExtension request"))?;
+
+        extensions
+            .update(&mut cx, |extensions, cx| {
+                extensions.install_extension(
+                    ExtensionVersion {
+                        id: extension.id,
+                        version: extension.version,
+                        dev: extension.dev,
+                    },
+                    PathBuf::from(envelope.payload.tmp_dir),
+                    cx,
+                )
+            })?
+            .await?;
+
+        Ok(proto::Ack {})
+    }
+}
+
+struct HeadlessRegistrationHooks {
+    language_registry: Arc<LanguageRegistry>,
+}
+
+impl HeadlessRegistrationHooks {
+    fn new(language_registry: Arc<LanguageRegistry>) -> Self {
+        Self { language_registry }
+    }
+}
+
+impl ExtensionRegistrationHooks for HeadlessRegistrationHooks {
+    fn register_language(
+        &self,
+        language: LanguageName,
+        _grammar: Option<Arc<str>>,
+        matcher: language::LanguageMatcher,
+        load: Arc<dyn Fn() -> Result<LoadedLanguage> + 'static + Send + Sync>,
+    ) {
+        log::info!("registering language: {:?}", language);
+        self.language_registry
+            .register_language(language, None, matcher, load)
+    }
+    fn register_lsp_adapter(&self, language: LanguageName, adapter: ExtensionLspAdapter) {
+        log::info!("registering lsp adapter {:?}", language);
+        self.language_registry
+            .register_lsp_adapter(language, Arc::new(adapter) as _);
+    }
+
+    fn register_wasm_grammars(&self, grammars: Vec<(Arc<str>, PathBuf)>) {
+        self.language_registry.register_wasm_grammars(grammars)
+    }
+
+    fn remove_lsp_adapter(&self, language: &LanguageName, server_name: &LanguageServerName) {
+        self.language_registry
+            .remove_lsp_adapter(language, server_name)
+    }
+
+    fn remove_languages(
+        &self,
+        languages_to_remove: &[LanguageName],
+        _grammars_to_remove: &[Arc<str>],
+    ) {
+        self.language_registry
+            .remove_languages(languages_to_remove, &[])
+    }
+
+    fn update_lsp_status(
+        &self,
+        server_name: LanguageServerName,
+        status: language::LanguageServerBinaryStatus,
+    ) {
+        self.language_registry
+            .update_lsp_status(server_name, status)
+    }
+}

crates/proto/proto/zed.proto 🔗

@@ -295,9 +295,13 @@ message Envelope {
         GetPanicFilesResponse get_panic_files_response = 281;
 
         CancelLanguageServerWork cancel_language_server_work = 282;
-        
+
         LspExtOpenDocs lsp_ext_open_docs = 283;
-        LspExtOpenDocsResponse lsp_ext_open_docs_response = 284; // current max
+        LspExtOpenDocsResponse lsp_ext_open_docs_response = 284;
+
+        SyncExtensions sync_extensions = 285;
+        SyncExtensionsResponse sync_extensions_response = 286;
+        InstallExtension install_extension = 287; // current max
     }
 
     reserved 87 to 88;
@@ -2544,3 +2548,23 @@ message CancelLanguageServerWork {
         optional string token = 2;
     }
 }
+
+message Extension {
+    string id = 1;
+    string version = 2;
+    bool dev = 3;
+}
+
+message SyncExtensions {
+    repeated Extension extensions = 1;
+}
+
+message SyncExtensionsResponse {
+    string tmp_dir = 1;
+    repeated Extension missing_extensions = 2;
+}
+
+message InstallExtension {
+    Extension extension = 1;
+    string tmp_dir = 2;
+}

crates/proto/src/proto.rs 🔗

@@ -368,6 +368,9 @@ messages!(
     (GetPanicFiles, Background),
     (GetPanicFilesResponse, Background),
     (CancelLanguageServerWork, Foreground),
+    (SyncExtensions, Background),
+    (SyncExtensionsResponse, Background),
+    (InstallExtension, Background),
 );
 
 request_messages!(
@@ -491,6 +494,8 @@ request_messages!(
     (GetPathMetadata, GetPathMetadataResponse),
     (GetPanicFiles, GetPanicFilesResponse),
     (CancelLanguageServerWork, Ack),
+    (SyncExtensions, SyncExtensionsResponse),
+    (InstallExtension, Ack),
 );
 
 entity_messages!(

crates/recent_projects/Cargo.toml 🔗

@@ -17,6 +17,7 @@ anyhow.workspace = true
 auto_update.workspace = true
 release_channel.workspace = true
 editor.workspace = true
+extension_host.workspace = true
 file_finder.workspace = true
 futures.workspace = true
 fuzzy.workspace = true

crates/recent_projects/src/ssh_connections.rs 🔗

@@ -4,6 +4,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};
 use anyhow::{anyhow, Result};
 use auto_update::AutoUpdater;
 use editor::Editor;
+use extension_host::ExtensionStore;
 use futures::channel::oneshot;
 use gpui::{
     percentage, Animation, AnimationExt, AnyWindowHandle, AsyncAppContext, DismissEvent,
@@ -630,6 +631,15 @@ pub async fn open_ssh_project(
             }
         }
 
+        window
+            .update(cx, |workspace, cx| {
+                if let Some(client) = workspace.project().read(cx).ssh_client().clone() {
+                    ExtensionStore::global(cx)
+                        .update(cx, |store, cx| store.register_ssh_client(client, cx));
+                }
+            })
+            .ok();
+
         break;
     }
 

crates/remote/src/ssh_session.rs 🔗

@@ -1269,6 +1269,7 @@ impl RemoteConnection for SshRemoteConnection {
                     .map(|port| vec!["-P".to_string(), port.to_string()])
                     .unwrap_or_default(),
             )
+            .arg("-C")
             .arg("-r")
             .arg(&src_path)
             .arg(format!(

crates/remote_server/Cargo.toml 🔗

@@ -29,6 +29,7 @@ chrono.workspace = true
 clap.workspace = true
 client.workspace = true
 env_logger.workspace = true
+extension_host.workspace = true
 fs.workspace = true
 futures.workspace = true
 git.workspace = true

crates/remote_server/src/headless_project.rs 🔗

@@ -1,4 +1,5 @@
 use anyhow::{anyhow, Result};
+use extension_host::headless_host::HeadlessExtensionStore;
 use fs::Fs;
 use gpui::{AppContext, AsyncAppContext, Context as _, Model, ModelContext, PromptLevel};
 use http_client::HttpClient;
@@ -37,6 +38,7 @@ pub struct HeadlessProject {
     pub settings_observer: Model<SettingsObserver>,
     pub next_entry_id: Arc<AtomicUsize>,
     pub languages: Arc<LanguageRegistry>,
+    pub extensions: Model<HeadlessExtensionStore>,
 }
 
 pub struct HeadlessAppState {
@@ -147,6 +149,15 @@ impl HeadlessProject {
         )
         .detach();
 
+        let extensions = HeadlessExtensionStore::new(
+            fs.clone(),
+            http_client.clone(),
+            languages.clone(),
+            paths::remote_extensions_dir().to_path_buf(),
+            node_runtime,
+            cx,
+        );
+
         let client: AnyProtoClient = session.clone().into();
 
         session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
@@ -173,6 +184,15 @@ impl HeadlessProject {
         client.add_model_request_handler(BufferStore::handle_update_buffer);
         client.add_model_message_handler(BufferStore::handle_close_buffer);
 
+        client.add_request_handler(
+            extensions.clone().downgrade(),
+            HeadlessExtensionStore::handle_sync_extensions,
+        );
+        client.add_request_handler(
+            extensions.clone().downgrade(),
+            HeadlessExtensionStore::handle_install_extension,
+        );
+
         BufferStore::init(&client);
         WorktreeStore::init(&client);
         SettingsObserver::init(&client);
@@ -190,6 +210,7 @@ impl HeadlessProject {
             task_store,
             next_entry_id: Default::default(),
             languages,
+            extensions,
         }
     }