Refactor lsp store (#17435)

Mikayla Maki , Conrad Irwin , and Conrad created

This PR moves the local, remote, and ssh components of the LSP store
into their own types.

Release Notes:

- N/A

---------

Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>
Co-authored-by: Conrad <conrad@zed.dev>

Change summary

crates/language_tools/src/lsp_log.rs         |   2 
crates/project/src/lsp_store.rs              | 721 +++++++++++++--------
crates/project/src/project.rs                | 111 ++
crates/remote_server/src/headless_project.rs |  11 
4 files changed, 536 insertions(+), 309 deletions(-)

Detailed changes

crates/language_tools/src/lsp_log.rs 🔗

@@ -688,7 +688,7 @@ impl LspLogView {
                 self.project
                     .read(cx)
                     .supplementary_language_servers(cx)
-                    .filter_map(|(&server_id, name)| {
+                    .filter_map(|(server_id, name)| {
                         let state = log_store.language_servers.get(&server_id)?;
                         Some(LogMenuItem {
                             server_id,

crates/project/src/lsp_store.rs 🔗

@@ -85,27 +85,82 @@ const SERVER_REINSTALL_DEBOUNCE_TIMEOUT: Duration = Duration::from_secs(1);
 const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
 pub const SERVER_PROGRESS_THROTTLE_TIMEOUT: Duration = Duration::from_millis(100);
 
+pub struct LocalLspStore {
+    http_client: Option<Arc<dyn HttpClient>>,
+    environment: Model<ProjectEnvironment>,
+    fs: Arc<dyn Fs>,
+    yarn: Model<YarnPathStore>,
+    language_servers: HashMap<LanguageServerId, LanguageServerState>,
+    last_workspace_edits_by_language_server: HashMap<LanguageServerId, ProjectTransaction>,
+    language_server_watched_paths: HashMap<LanguageServerId, HashMap<WorktreeId, GlobSet>>,
+    language_server_watcher_registrations:
+        HashMap<LanguageServerId, HashMap<String, Vec<FileSystemWatcher>>>,
+    supplementary_language_servers:
+        HashMap<LanguageServerId, (LanguageServerName, Arc<LanguageServer>)>,
+    _subscription: gpui::Subscription,
+}
+
+impl LocalLspStore {
+    fn shutdown_language_servers(
+        &mut self,
+        _cx: &mut ModelContext<LspStore>,
+    ) -> impl Future<Output = ()> {
+        let shutdown_futures = self
+            .language_servers
+            .drain()
+            .map(|(_, server_state)| async {
+                use LanguageServerState::*;
+                match server_state {
+                    Running { server, .. } => server.shutdown()?.await,
+                    Starting(task) => task.await?.shutdown()?.await,
+                }
+            })
+            .collect::<Vec<_>>();
+
+        async move {
+            futures::future::join_all(shutdown_futures).await;
+        }
+    }
+}
+
+pub struct RemoteLspStore {
+    upstream_client: AnyProtoClient,
+}
+
+impl RemoteLspStore {}
+
+pub struct SshLspStore {
+    upstream_client: AnyProtoClient,
+}
+
+#[allow(clippy::large_enum_variant)]
+pub enum LspStoreMode {
+    Local(LocalLspStore),   // ssh host and collab host
+    Remote(RemoteLspStore), // collab guest
+    Ssh(SshLspStore),       // ssh client
+}
+
+impl LspStoreMode {
+    fn is_local(&self) -> bool {
+        matches!(self, LspStoreMode::Local(_))
+    }
+
+    fn is_ssh(&self) -> bool {
+        matches!(self, LspStoreMode::Ssh(_))
+    }
+}
+
 pub struct LspStore {
+    mode: LspStoreMode,
     downstream_client: Option<AnyProtoClient>,
-    upstream_client: Option<AnyProtoClient>,
     project_id: u64,
-    http_client: Option<Arc<dyn HttpClient>>,
-    fs: Arc<dyn Fs>,
     nonce: u128,
     buffer_store: Model<BufferStore>,
     worktree_store: Model<WorktreeStore>,
     buffer_snapshots: HashMap<BufferId, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
-    environment: Option<Model<ProjectEnvironment>>,
-    supplementary_language_servers:
-        HashMap<LanguageServerId, (LanguageServerName, Arc<LanguageServer>)>,
     languages: Arc<LanguageRegistry>,
-    language_servers: HashMap<LanguageServerId, LanguageServerState>,
     language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
     language_server_statuses: BTreeMap<LanguageServerId, LanguageServerStatus>,
-    last_workspace_edits_by_language_server: HashMap<LanguageServerId, ProjectTransaction>,
-    language_server_watched_paths: HashMap<LanguageServerId, HashMap<WorktreeId, GlobSet>>,
-    language_server_watcher_registrations:
-        HashMap<LanguageServerId, HashMap<String, Vec<FileSystemWatcher>>>,
     active_entry: Option<ProjectEntryId>,
     _maintain_workspace_config: Task<Result<()>>,
     _maintain_buffer_languages: Task<()>,
@@ -122,8 +177,6 @@ pub struct LspStore {
             )>,
         >,
     >,
-    yarn: Model<YarnPathStore>,
-    _subscription: gpui::Subscription,
 }
 
 pub enum LspStoreEvent {
@@ -209,17 +262,53 @@ impl LspStore {
         client.add_model_request_handler(Self::handle_lsp_command::<LinkedEditingRange>);
     }
 
-    #[allow(clippy::too_many_arguments)]
-    pub fn new(
+    pub fn as_remote(&self) -> Option<&RemoteLspStore> {
+        match &self.mode {
+            LspStoreMode::Remote(remote_lsp_store) => Some(remote_lsp_store),
+            _ => None,
+        }
+    }
+
+    pub fn as_ssh(&self) -> Option<&SshLspStore> {
+        match &self.mode {
+            LspStoreMode::Ssh(ssh_lsp_store) => Some(ssh_lsp_store),
+            _ => None,
+        }
+    }
+
+    pub fn as_local(&self) -> Option<&LocalLspStore> {
+        match &self.mode {
+            LspStoreMode::Local(local_lsp_store) => Some(local_lsp_store),
+            _ => None,
+        }
+    }
+
+    pub fn as_local_mut(&mut self) -> Option<&mut LocalLspStore> {
+        match &mut self.mode {
+            LspStoreMode::Local(local_lsp_store) => Some(local_lsp_store),
+            _ => None,
+        }
+    }
+
+    pub fn upstream_client(&self) -> Option<AnyProtoClient> {
+        match &self.mode {
+            LspStoreMode::Ssh(SshLspStore {
+                upstream_client, ..
+            })
+            | LspStoreMode::Remote(RemoteLspStore {
+                upstream_client, ..
+            }) => Some(upstream_client.clone()),
+            LspStoreMode::Local(_) => None,
+        }
+    }
+
+    pub fn new_local(
         buffer_store: Model<BufferStore>,
         worktree_store: Model<WorktreeStore>,
-        environment: Option<Model<ProjectEnvironment>>,
+        environment: Model<ProjectEnvironment>,
         languages: Arc<LanguageRegistry>,
         http_client: Option<Arc<dyn HttpClient>>,
         fs: Arc<dyn Fs>,
-        downstream_client: Option<AnyProtoClient>,
-        upstream_client: Option<AnyProtoClient>,
-        remote_id: Option<u64>,
         cx: &mut ModelContext<Self>,
     ) -> Self {
         let yarn = YarnPathStore::new(fs.clone(), cx);
@@ -229,32 +318,85 @@ impl LspStore {
             .detach();
 
         Self {
-            downstream_client,
-            upstream_client,
-            http_client,
-            fs,
-            project_id: remote_id.unwrap_or(0),
+            mode: LspStoreMode::Local(LocalLspStore {
+                supplementary_language_servers: Default::default(),
+                language_servers: Default::default(),
+                last_workspace_edits_by_language_server: Default::default(),
+                language_server_watched_paths: Default::default(),
+                language_server_watcher_registrations: Default::default(),
+                environment,
+                http_client,
+                fs,
+                yarn,
+                _subscription: cx.on_app_quit(|this, cx| {
+                    this.as_local_mut().unwrap().shutdown_language_servers(cx)
+                }),
+            }),
+            downstream_client: None,
+            project_id: 0,
             buffer_store,
             worktree_store,
             languages: languages.clone(),
-            environment,
+            language_server_ids: Default::default(),
+            language_server_statuses: Default::default(),
             nonce: StdRng::from_entropy().gen(),
             buffer_snapshots: Default::default(),
-            supplementary_language_servers: Default::default(),
-            language_servers: Default::default(),
+            next_diagnostic_group_id: Default::default(),
+            diagnostic_summaries: Default::default(),
+            diagnostics: Default::default(),
+            active_entry: None,
+            _maintain_workspace_config: Self::maintain_workspace_config(cx),
+            _maintain_buffer_languages: Self::maintain_buffer_languages(languages.clone(), cx),
+        }
+    }
+
+    fn send_lsp_proto_request<R: LspCommand>(
+        &self,
+        buffer: Model<Buffer>,
+        client: AnyProtoClient,
+        request: R,
+        cx: &mut ModelContext<'_, LspStore>,
+    ) -> Task<anyhow::Result<<R as LspCommand>::Response>> {
+        let message = request.to_proto(self.project_id, buffer.read(cx));
+        cx.spawn(move |this, cx| async move {
+            let response = client.request(message).await?;
+            let this = this.upgrade().context("project dropped")?;
+            request
+                .response_from_proto(response, this, buffer, cx)
+                .await
+        })
+    }
+
+    pub fn new_remote(
+        buffer_store: Model<BufferStore>,
+        worktree_store: Model<WorktreeStore>,
+        languages: Arc<LanguageRegistry>,
+        upstream_client: AnyProtoClient,
+        project_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) -> Self {
+        cx.subscribe(&buffer_store, Self::on_buffer_store_event)
+            .detach();
+        cx.subscribe(&worktree_store, Self::on_worktree_store_event)
+            .detach();
+
+        Self {
+            mode: LspStoreMode::Remote(RemoteLspStore { upstream_client }),
+            downstream_client: None,
+            project_id,
+            buffer_store,
+            worktree_store,
+            languages: languages.clone(),
             language_server_ids: Default::default(),
             language_server_statuses: Default::default(),
-            last_workspace_edits_by_language_server: Default::default(),
-            language_server_watched_paths: Default::default(),
-            language_server_watcher_registrations: Default::default(),
+            nonce: StdRng::from_entropy().gen(),
+            buffer_snapshots: Default::default(),
             next_diagnostic_group_id: Default::default(),
             diagnostic_summaries: Default::default(),
             diagnostics: Default::default(),
             active_entry: None,
-            yarn,
             _maintain_workspace_config: Self::maintain_workspace_config(cx),
             _maintain_buffer_languages: Self::maintain_buffer_languages(languages.clone(), cx),
-            _subscription: cx.on_app_quit(Self::shutdown_language_servers),
         }
     }
 
@@ -494,27 +636,6 @@ impl LspStore {
         self.active_entry = active_entry;
     }
 
-    fn shutdown_language_servers(
-        &mut self,
-        _cx: &mut ModelContext<Self>,
-    ) -> impl Future<Output = ()> {
-        let shutdown_futures = self
-            .language_servers
-            .drain()
-            .map(|(_, server_state)| async {
-                use LanguageServerState::*;
-                match server_state {
-                    Running { server, .. } => server.shutdown()?.await,
-                    Starting(task) => task.await?.shutdown()?.await,
-                }
-            })
-            .collect::<Vec<_>>();
-
-        async move {
-            futures::future::join_all(shutdown_futures).await;
-        }
-    }
-
     pub(crate) fn send_diagnostic_summaries(
         &self,
         worktree: &mut Worktree,
@@ -547,9 +668,11 @@ impl LspStore {
         <R::LspRequest as lsp::request::Request>::Params: Send,
     {
         let buffer = buffer_handle.read(cx);
-        if self.upstream_client.is_some() {
-            return self.send_lsp_proto_request(buffer_handle, self.project_id, request, cx);
+
+        if let Some(upstream_client) = self.upstream_client() {
+            return self.send_lsp_proto_request(buffer_handle, upstream_client, request, cx);
         }
+
         let language_server = match server {
             LanguageServerToQuery::Primary => {
                 match self.primary_language_server_for_buffer(buffer, cx) {
@@ -635,26 +758,6 @@ impl LspStore {
         Task::ready(Ok(Default::default()))
     }
 
-    fn send_lsp_proto_request<R: LspCommand>(
-        &self,
-        buffer: Model<Buffer>,
-        project_id: u64,
-        request: R,
-        cx: &mut ModelContext<'_, Self>,
-    ) -> Task<anyhow::Result<<R as LspCommand>::Response>> {
-        let Some(upstream_client) = self.upstream_client.clone() else {
-            return Task::ready(Err(anyhow!("disconnected before completing request")));
-        };
-        let message = request.to_proto(project_id, buffer.read(cx));
-        cx.spawn(move |this, cx| async move {
-            let response = upstream_client.request(message).await?;
-            let this = this.upgrade().context("project dropped")?;
-            request
-                .response_from_proto(response, this, buffer, cx)
-                .await
-        })
-    }
-
     pub async fn execute_code_actions_on_servers(
         this: &WeakModel<LspStore>,
         adapters_and_servers: &Vec<(Arc<CachedLspAdapter>, Arc<LanguageServer>)>,
@@ -702,8 +805,10 @@ impl LspStore {
 
                 if let Some(command) = action.lsp_action.command {
                     this.update(cx, |this, _| {
-                        this.last_workspace_edits_by_language_server
-                            .remove(&language_server.server_id());
+                        if let LspStoreMode::Local(mode) = &mut this.mode {
+                            mode.last_workspace_edits_by_language_server
+                                .remove(&language_server.server_id());
+                        }
                     })?;
 
                     language_server
@@ -715,12 +820,14 @@ impl LspStore {
                         .await?;
 
                     this.update(cx, |this, _| {
-                        project_transaction.0.extend(
-                            this.last_workspace_edits_by_language_server
-                                .remove(&language_server.server_id())
-                                .unwrap_or_default()
-                                .0,
-                        )
+                        if let LspStoreMode::Local(mode) = &mut this.mode {
+                            project_transaction.0.extend(
+                                mode.last_workspace_edits_by_language_server
+                                    .remove(&language_server.server_id())
+                                    .unwrap_or_default()
+                                    .0,
+                            )
+                        }
                     })?;
                 }
             }
@@ -753,7 +860,7 @@ impl LspStore {
         push_to_history: bool,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
-        if let Some(upstream_client) = self.upstream_client.clone() {
+        if let Some(upstream_client) = self.upstream_client() {
             let request = proto::ApplyCodeAction {
                 project_id: self.project_id,
                 buffer_id: buffer_handle.read(cx).remote_id().into(),
@@ -802,7 +909,9 @@ impl LspStore {
 
                 if let Some(command) = action.lsp_action.command {
                     this.update(&mut cx, |this, _| {
-                        this.last_workspace_edits_by_language_server
+                        this.as_local_mut()
+                            .unwrap()
+                            .last_workspace_edits_by_language_server
                             .remove(&lang_server.server_id());
                     })?;
 
@@ -820,7 +929,9 @@ impl LspStore {
                     }
 
                     return this.update(&mut cx, |this, _| {
-                        this.last_workspace_edits_by_language_server
+                        this.as_local_mut()
+                            .unwrap()
+                            .last_workspace_edits_by_language_server
                             .remove(&lang_server.server_id())
                             .unwrap_or_default()
                     });
@@ -838,7 +949,7 @@ impl LspStore {
         server_id: LanguageServerId,
         cx: &mut ModelContext<Self>,
     ) -> Task<anyhow::Result<InlayHint>> {
-        if let Some(upstream_client) = self.upstream_client.clone() {
+        if let Some(upstream_client) = self.upstream_client() {
             let request = proto::ResolveInlayHint {
                 project_id: self.project_id,
                 buffer_id: buffer_handle.read(cx).remote_id().into(),
@@ -916,7 +1027,7 @@ impl LspStore {
             .map(|(_, server)| LanguageServerToQuery::Other(server.server_id()))
             .next()
             .or_else(|| {
-                self.upstream_client
+                self.upstream_client()
                     .is_some()
                     .then_some(LanguageServerToQuery::Primary)
             })
@@ -949,7 +1060,7 @@ impl LspStore {
         trigger: String,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Option<Transaction>>> {
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let request = proto::OnTypeFormatting {
                 project_id: self.project_id,
                 buffer_id: buffer.read(cx).remote_id().into(),
@@ -1099,7 +1210,7 @@ impl LspStore {
         range: Range<Anchor>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Vec<CodeAction>> {
-        if let Some(upstream_client) = self.upstream_client.as_ref() {
+        if let Some(upstream_client) = self.upstream_client() {
             let request_task = upstream_client.request(proto::MultiLspQuery {
                 buffer_id: buffer_handle.read(cx).remote_id().into(),
                 version: serialize_version(&buffer_handle.read(cx).version()),
@@ -1179,10 +1290,10 @@ impl LspStore {
     ) -> Task<Result<Vec<Completion>>> {
         let language_registry = self.languages.clone();
 
-        if let Some(_) = self.upstream_client.clone() {
+        if let Some(upstream_client) = self.upstream_client() {
             let task = self.send_lsp_proto_request(
                 buffer.clone(),
-                self.project_id,
+                upstream_client,
                 GetCompletions { position, context },
                 cx,
             );
@@ -1273,7 +1384,7 @@ impl LspStore {
         completions: Arc<RwLock<Box<[Completion]>>>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<bool>> {
-        let client = self.upstream_client.clone();
+        let client = self.upstream_client();
         let language_registry = self.languages.clone();
         let project_id = self.project_id;
 
@@ -1482,7 +1593,7 @@ impl LspStore {
         let buffer = buffer_handle.read(cx);
         let buffer_id = buffer.remote_id();
 
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let project_id = self.project_id;
             cx.spawn(move |_, mut cx| async move {
                 let response = client
@@ -1598,7 +1709,7 @@ impl LspStore {
         let buffer_id = buffer.remote_id().into();
         let lsp_request = InlayHints { range };
 
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let request = proto::InlayHints {
                 project_id: self.project_id,
                 buffer_id,
@@ -1648,7 +1759,7 @@ impl LspStore {
     ) -> Task<Vec<SignatureHelp>> {
         let position = position.to_point_utf16(buffer.read(cx));
 
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let request_task = client.request(proto::MultiLspQuery {
                 buffer_id: buffer.read(cx).remote_id().into(),
                 version: serialize_version(&buffer.read(cx).version()),
@@ -1720,7 +1831,7 @@ impl LspStore {
         position: PointUtf16,
         cx: &mut ModelContext<Self>,
     ) -> Task<Vec<Hover>> {
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let request_task = client.request(proto::MultiLspQuery {
                 buffer_id: buffer.read(cx).remote_id().into(),
                 version: serialize_version(&buffer.read(cx).version()),
@@ -1794,7 +1905,7 @@ impl LspStore {
     pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
         let language_registry = self.languages.clone();
 
-        if let Some(upstream_client) = self.upstream_client.as_ref() {
+        if let Some(upstream_client) = self.upstream_client().as_ref() {
             let request = upstream_client.request(proto::GetProjectSymbols {
                 project_id: self.project_id,
                 query: query.to_string(),
@@ -1841,16 +1952,17 @@ impl LspStore {
                 }
                 let worktree_abs_path = worktree.abs_path().clone();
 
-                let (lsp_adapter, language, server) = match self.language_servers.get(server_id) {
-                    Some(LanguageServerState::Running {
-                        adapter,
-                        language,
-                        server,
-                        ..
-                    }) => (adapter.clone(), language.clone(), server),
+                let (lsp_adapter, language, server) =
+                    match self.as_local().unwrap().language_servers.get(server_id) {
+                        Some(LanguageServerState::Running {
+                            adapter,
+                            language,
+                            server,
+                            ..
+                        }) => (adapter.clone(), language.clone(), server),
 
-                    _ => continue,
-                };
+                        _ => continue,
+                    };
 
                 requests.push(
                     server
@@ -2152,7 +2264,7 @@ impl LspStore {
                                 .worktree_store
                                 .read(cx)
                                 .worktree_for_id(*worktree_id, cx)?;
-                            let state = this.language_servers.get(server_id)?;
+                            let state = this.as_local()?.language_servers.get(server_id)?;
                             let delegate = ProjectLspAdapterDelegate::new(this, &worktree, cx);
                             match state {
                                 LanguageServerState::Starting(_) => None,
@@ -2218,7 +2330,7 @@ impl LspStore {
                         language,
                         server,
                         ..
-                    }) = self.language_servers.get(id)
+                    }) = self.as_local()?.language_servers.get(id)
                     {
                         return Some((adapter, language, server));
                     }
@@ -2245,11 +2357,17 @@ impl LspStore {
             self.language_server_ids
                 .remove(&(id_to_remove, server_name));
             self.language_server_statuses.remove(&server_id_to_remove);
-            self.language_server_watched_paths
-                .remove(&server_id_to_remove);
-            self.last_workspace_edits_by_language_server
-                .remove(&server_id_to_remove);
-            self.language_servers.remove(&server_id_to_remove);
+            if let Some(local_lsp_store) = self.as_local_mut() {
+                local_lsp_store
+                    .language_server_watched_paths
+                    .remove(&server_id_to_remove);
+                local_lsp_store
+                    .last_workspace_edits_by_language_server
+                    .remove(&server_id_to_remove);
+                local_lsp_store
+                    .language_servers
+                    .remove(&server_id_to_remove);
+            }
             cx.emit(LspStoreEvent::LanguageServerRemoved(server_id_to_remove));
         }
     }
@@ -2345,7 +2463,7 @@ impl LspStore {
                     let server = self
                         .language_server_ids
                         .get(&(worktree_id, adapter.name.clone()))
-                        .and_then(|id| self.language_servers.get(id))
+                        .and_then(|id| self.as_local()?.language_servers.get(id))
                         .and_then(|server_state| {
                             if let LanguageServerState::Running { server, .. } = server_state {
                                 Some(server.clone())
@@ -2541,7 +2659,7 @@ impl LspStore {
         symbol: &Symbol,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Model<Buffer>>> {
-        if let Some(client) = self.upstream_client.clone() {
+        if let Some(client) = self.upstream_client() {
             let request = client.request(proto::OpenBufferForSymbol {
                 project_id: self.project_id,
                 symbol: Some(Self::serialize_symbol(symbol)),
@@ -2609,7 +2727,7 @@ impl LspStore {
             let p = abs_path.clone();
             let yarn_worktree = this
                 .update(&mut cx, move |this, cx| {
-                    this.yarn.update(cx, |_, cx| {
+                    this.as_local().unwrap().yarn.update(cx, |_, cx| {
                         cx.spawn(|this, mut cx| async move {
                             let t = this
                                 .update(&mut cx, |this, cx| {
@@ -2759,7 +2877,7 @@ impl LspStore {
         <R::LspRequest as lsp::request::Request>::Result: Send,
         <R::LspRequest as lsp::request::Request>::Params: Send,
     {
-        debug_assert!(self.upstream_client.is_none());
+        debug_assert!(self.upstream_client().is_none());
 
         let snapshot = buffer.read(cx).snapshot();
         let scope = position.and_then(|position| snapshot.language_scope_at(position));
@@ -3188,7 +3306,9 @@ impl LspStore {
             simulate_disk_based_diagnostics_completion,
             adapter,
             ..
-        }) = self.language_servers.get_mut(&language_server_id)
+        }) = self
+            .as_local_mut()
+            .and_then(|local_store| local_store.language_servers.get_mut(&language_server_id))
         else {
             return;
         };
@@ -3209,8 +3329,9 @@ impl LspStore {
                     if let Some(LanguageServerState::Running {
                         simulate_disk_based_diagnostics_completion,
                         ..
-                    }) = this.language_servers.get_mut(&language_server_id)
-                    {
+                    }) = this.as_local_mut().and_then(|local_store| {
+                        local_store.language_servers.get_mut(&language_server_id)
+                    }) {
                         *simulate_disk_based_diagnostics_completion = None;
                     }
                 })
@@ -3236,21 +3357,24 @@ impl LspStore {
         language_server_id: LanguageServerId,
         cx: &mut ModelContext<Self>,
     ) {
-        let Some(watchers) = self
+        let worktrees = self.worktree_store.read(cx).worktrees().collect::<Vec<_>>();
+        let local_lsp_store = self.as_local_mut().unwrap();
+
+        let Some(watchers) = local_lsp_store
             .language_server_watcher_registrations
             .get(&language_server_id)
         else {
             return;
         };
 
-        let watched_paths = self
+        let watched_paths = local_lsp_store
             .language_server_watched_paths
             .entry(language_server_id)
             .or_default();
 
         let mut builders = HashMap::default();
         for watcher in watchers.values().flatten() {
-            for worktree in self.worktree_store.read(cx).worktrees().collect::<Vec<_>>() {
+            for worktree in worktrees.iter() {
                 let glob_is_inside_worktree = worktree.update(cx, |tree, _| {
                     if let Some(abs_path) = tree.abs_path().to_str() {
                         let relative_glob_pattern = match &watcher.glob_pattern {
@@ -3304,10 +3428,18 @@ impl LspStore {
     }
 
     pub fn language_server_for_id(&self, id: LanguageServerId) -> Option<Arc<LanguageServer>> {
-        if let Some(LanguageServerState::Running { server, .. }) = self.language_servers.get(&id) {
-            Some(server.clone())
-        } else if let Some((_, server)) = self.supplementary_language_servers.get(&id) {
-            Some(Arc::clone(server))
+        if let Some(local_lsp_store) = self.as_local() {
+            if let Some(LanguageServerState::Running { server, .. }) =
+                local_lsp_store.language_servers.get(&id)
+            {
+                Some(server.clone())
+            } else if let Some((_, server)) =
+                local_lsp_store.supplementary_language_servers.get(&id)
+            {
+                Some(Arc::clone(server))
+            } else {
+                None
+            }
         } else {
             None
         }
@@ -3338,7 +3470,9 @@ impl LspStore {
         .log_err();
         this.update(&mut cx, |this, _| {
             if let Some(transaction) = transaction {
-                this.last_workspace_edits_by_language_server
+                this.as_local_mut()
+                    .unwrap()
+                    .last_workspace_edits_by_language_server
                     .insert(server_id, transaction);
             }
         })?;
@@ -3523,14 +3657,16 @@ impl LspStore {
         params: DidChangeWatchedFilesRegistrationOptions,
         cx: &mut ModelContext<Self>,
     ) {
-        let registrations = self
-            .language_server_watcher_registrations
-            .entry(language_server_id)
-            .or_default();
+        if let Some(local) = self.as_local_mut() {
+            let registrations = local
+                .language_server_watcher_registrations
+                .entry(language_server_id)
+                .or_default();
 
-        registrations.insert(registration_id.to_string(), params.watchers);
+            registrations.insert(registration_id.to_string(), params.watchers);
 
-        self.rebuild_watched_paths(language_server_id, cx);
+            self.rebuild_watched_paths(language_server_id, cx);
+        }
     }
 
     fn on_lsp_unregister_did_change_watched_files(
@@ -3539,26 +3675,28 @@ impl LspStore {
         registration_id: &str,
         cx: &mut ModelContext<Self>,
     ) {
-        let registrations = self
-            .language_server_watcher_registrations
-            .entry(language_server_id)
-            .or_default();
+        if let Some(local) = self.as_local_mut() {
+            let registrations = local
+                .language_server_watcher_registrations
+                .entry(language_server_id)
+                .or_default();
 
-        if registrations.remove(registration_id).is_some() {
-            log::info!(
+            if registrations.remove(registration_id).is_some() {
+                log::info!(
                 "language server {}: unregistered workspace/DidChangeWatchedFiles capability with id {}",
                 language_server_id,
                 registration_id
             );
-        } else {
-            log::warn!(
+            } else {
+                log::warn!(
                 "language server {}: failed to unregister workspace/DidChangeWatchedFiles capability with id {}. not registered.",
                 language_server_id,
                 registration_id
             );
-        }
+            }
 
-        self.rebuild_watched_paths(language_server_id, cx);
+            self.rebuild_watched_paths(language_server_id, cx);
+        }
     }
 
     #[allow(clippy::type_complexity)]
@@ -4075,120 +4213,133 @@ impl LspStore {
         language: Arc<Language>,
         cx: &mut ModelContext<Self>,
     ) {
-        if adapter.reinstall_attempt_count.load(SeqCst) > MAX_SERVER_REINSTALL_ATTEMPT_COUNT {
-            return;
-        }
-
-        let worktree = worktree_handle.read(cx);
-        let worktree_id = worktree.id();
-        let worktree_path = worktree.abs_path();
-        let key = (worktree_id, adapter.name.clone());
-        if self.language_server_ids.contains_key(&key) {
-            return;
-        }
-
-        let stderr_capture = Arc::new(Mutex::new(Some(String::new())));
-        let lsp_adapter_delegate = ProjectLspAdapterDelegate::new(self, worktree_handle, cx);
-        let cli_environment = self
-            .environment
-            .as_ref()
-            .and_then(|environment| environment.read(cx).get_cli_environment());
-        let pending_server = match self.languages.create_pending_language_server(
-            stderr_capture.clone(),
-            language.clone(),
-            adapter.clone(),
-            Arc::clone(&worktree_path),
-            lsp_adapter_delegate.clone(),
-            cli_environment,
-            cx,
-        ) {
-            Some(pending_server) => pending_server,
-            None => return,
-        };
-
-        let project_settings = ProjectSettings::get(
-            Some(SettingsLocation {
-                worktree_id: worktree_id.to_proto() as usize,
-                path: Path::new(""),
-            }),
-            cx,
-        );
-        let lsp = project_settings.lsp.get(&adapter.name.0);
-        let override_options = lsp.and_then(|s| s.initialization_options.clone());
+        if self.mode.is_local() {
+            if adapter.reinstall_attempt_count.load(SeqCst) > MAX_SERVER_REINSTALL_ATTEMPT_COUNT {
+                return;
+            }
 
-        let server_id = pending_server.server_id;
-        let container_dir = pending_server.container_dir.clone();
-        let state = LanguageServerState::Starting({
-            let adapter = adapter.clone();
-            let server_name = adapter.name.0.clone();
-            let language = language.clone();
-            let key = key.clone();
+            let worktree = worktree_handle.read(cx);
+            let worktree_id = worktree.id();
+            let worktree_path = worktree.abs_path();
+            let key = (worktree_id, adapter.name.clone());
+            if self.language_server_ids.contains_key(&key) {
+                return;
+            }
 
-            cx.spawn(move |this, mut cx| async move {
-                let result = Self::setup_and_insert_language_server(
-                    this.clone(),
-                    lsp_adapter_delegate,
-                    override_options,
-                    pending_server,
-                    adapter.clone(),
-                    language.clone(),
-                    server_id,
-                    key,
-                    &mut cx,
-                )
-                .await;
+            let stderr_capture = Arc::new(Mutex::new(Some(String::new())));
+            let lsp_adapter_delegate = ProjectLspAdapterDelegate::new(self, worktree_handle, cx);
+            let cli_environment = self
+                .as_local()
+                .unwrap()
+                .environment
+                .read(cx)
+                .get_cli_environment();
+
+            let pending_server = match self.languages.create_pending_language_server(
+                stderr_capture.clone(),
+                language.clone(),
+                adapter.clone(),
+                Arc::clone(&worktree_path),
+                lsp_adapter_delegate.clone(),
+                cli_environment,
+                cx,
+            ) {
+                Some(pending_server) => pending_server,
+                None => return,
+            };
 
-                match result {
-                    Ok(server) => {
-                        stderr_capture.lock().take();
-                        server
-                    }
+            let project_settings = ProjectSettings::get(
+                Some(SettingsLocation {
+                    worktree_id: worktree_id.to_proto() as usize,
+                    path: Path::new(""),
+                }),
+                cx,
+            );
+            let lsp = project_settings.lsp.get(&adapter.name.0);
+            let override_options = lsp.and_then(|s| s.initialization_options.clone());
 
-                    Err(err) => {
-                        log::error!("failed to start language server {server_name:?}: {err}");
-                        log::error!("server stderr: {:?}", stderr_capture.lock().take());
+            let server_id = pending_server.server_id;
+            let container_dir = pending_server.container_dir.clone();
+            let state = LanguageServerState::Starting({
+                let adapter = adapter.clone();
+                let server_name = adapter.name.0.clone();
+                let language = language.clone();
+                let key = key.clone();
 
-                        let this = this.upgrade()?;
-                        let container_dir = container_dir?;
+                cx.spawn(move |this, mut cx| async move {
+                    let result = Self::setup_and_insert_language_server(
+                        this.clone(),
+                        lsp_adapter_delegate,
+                        override_options,
+                        pending_server,
+                        adapter.clone(),
+                        language.clone(),
+                        server_id,
+                        key,
+                        &mut cx,
+                    )
+                    .await;
 
-                        let attempt_count = adapter.reinstall_attempt_count.fetch_add(1, SeqCst);
-                        if attempt_count >= MAX_SERVER_REINSTALL_ATTEMPT_COUNT {
-                            let max = MAX_SERVER_REINSTALL_ATTEMPT_COUNT;
-                            log::error!("Hit {max} reinstallation attempts for {server_name:?}");
-                            return None;
+                    match result {
+                        Ok(server) => {
+                            stderr_capture.lock().take();
+                            server
                         }
 
-                        log::info!(
-                            "retrying installation of language server {server_name:?} in {}s",
-                            SERVER_REINSTALL_DEBOUNCE_TIMEOUT.as_secs()
-                        );
-                        cx.background_executor()
-                            .timer(SERVER_REINSTALL_DEBOUNCE_TIMEOUT)
-                            .await;
+                        Err(err) => {
+                            log::error!("failed to start language server {server_name:?}: {err}");
+                            log::error!("server stderr: {:?}", stderr_capture.lock().take());
 
-                        let installation_test_binary = adapter
-                            .installation_test_binary(container_dir.to_path_buf())
-                            .await;
+                            let this = this.upgrade()?;
+                            let container_dir = container_dir?;
 
-                        this.update(&mut cx, |_, cx| {
-                            Self::check_errored_server(
-                                language,
-                                adapter,
-                                server_id,
-                                installation_test_binary,
-                                cx,
-                            )
-                        })
-                        .ok();
+                            let attempt_count =
+                                adapter.reinstall_attempt_count.fetch_add(1, SeqCst);
+                            if attempt_count >= MAX_SERVER_REINSTALL_ATTEMPT_COUNT {
+                                let max = MAX_SERVER_REINSTALL_ATTEMPT_COUNT;
+                                log::error!(
+                                    "Hit {max} reinstallation attempts for {server_name:?}"
+                                );
+                                return None;
+                            }
 
-                        None
+                            log::info!(
+                                "retrying installation of language server {server_name:?} in {}s",
+                                SERVER_REINSTALL_DEBOUNCE_TIMEOUT.as_secs()
+                            );
+                            cx.background_executor()
+                                .timer(SERVER_REINSTALL_DEBOUNCE_TIMEOUT)
+                                .await;
+
+                            let installation_test_binary = adapter
+                                .installation_test_binary(container_dir.to_path_buf())
+                                .await;
+
+                            this.update(&mut cx, |_, cx| {
+                                Self::check_errored_server(
+                                    language,
+                                    adapter,
+                                    server_id,
+                                    installation_test_binary,
+                                    cx,
+                                )
+                            })
+                            .ok();
+
+                            None
+                        }
                     }
-                }
-            })
-        });
+                })
+            });
 
-        self.language_servers.insert(server_id, state);
-        self.language_server_ids.insert(key, server_id);
+            self.as_local_mut()
+                .unwrap()
+                .language_servers
+                .insert(server_id, state);
+            self.language_server_ids.insert(key, server_id);
+        } else if self.mode.is_ssh() {
+            // TODO ssh
+        }
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -4242,44 +4393,56 @@ impl LspStore {
     ) -> Option<Task<()>> {
         log::info!("beginning to reinstall server");
 
-        let existing_server = match self.language_servers.remove(&server_id) {
-            Some(LanguageServerState::Running { server, .. }) => Some(server),
-            _ => None,
-        };
+        if let Some(local) = self.as_local_mut() {
+            let existing_server = match local.language_servers.remove(&server_id) {
+                Some(LanguageServerState::Running { server, .. }) => Some(server),
+                _ => None,
+            };
 
-        self.worktree_store.update(cx, |store, cx| {
-            for worktree in store.worktrees() {
-                let key = (worktree.read(cx).id(), adapter.name.clone());
-                self.language_server_ids.remove(&key);
-            }
-        });
+            self.worktree_store.update(cx, |store, cx| {
+                for worktree in store.worktrees() {
+                    let key = (worktree.read(cx).id(), adapter.name.clone());
+                    self.language_server_ids.remove(&key);
+                }
+            });
+
+            Some(cx.spawn(move |this, mut cx| async move {
+                if let Some(task) = existing_server.and_then(|server| server.shutdown()) {
+                    log::info!("shutting down existing server");
+                    task.await;
+                }
 
-        Some(cx.spawn(move |this, mut cx| async move {
-            if let Some(task) = existing_server.and_then(|server| server.shutdown()) {
-                log::info!("shutting down existing server");
+                // TODO: This is race-safe with regards to preventing new instances from
+                // starting while deleting, but existing instances in other projects are going
+                // to be very confused and messed up
+                let Some(task) = this
+                    .update(&mut cx, |this, cx| {
+                        this.languages.delete_server_container(adapter.clone(), cx)
+                    })
+                    .log_err()
+                else {
+                    return;
+                };
                 task.await;
-            }
 
-            // TODO: This is race-safe with regards to preventing new instances from
-            // starting while deleting, but existing instances in other projects are going
-            // to be very confused and messed up
-            let Some(task) = this
-                .update(&mut cx, |this, cx| {
-                    this.languages.delete_server_container(adapter.clone(), cx)
+                this.update(&mut cx, |this, cx| {
+                    for worktree in this.worktree_store.read(cx).worktrees().collect::<Vec<_>>() {
+                        this.start_language_server(
+                            &worktree,
+                            adapter.clone(),
+                            language.clone(),
+                            cx,
+                        );
+                    }
                 })
-                .log_err()
-            else {
-                return;
-            };
-            task.await;
-
-            this.update(&mut cx, |this, cx| {
-                for worktree in this.worktree_store.read(cx).worktrees().collect::<Vec<_>>() {
-                    this.start_language_server(&worktree, adapter.clone(), language.clone(), cx);
-                }
-            })
-            .ok();
-        }))
+                .ok();
+            }))
+        } else if let Some(_ssh_store) = self.as_ssh() {
+            // TODO
+            None
+        } else {
+            None
+        }
     }
 
     async fn shutdown_language_server(

crates/project/src/project.rs 🔗

@@ -643,16 +643,13 @@ impl Project {
 
             let environment = ProjectEnvironment::new(&worktree_store, env, cx);
             let lsp_store = cx.new_model(|cx| {
-                LspStore::new(
+                LspStore::new_local(
                     buffer_store.clone(),
                     worktree_store.clone(),
-                    Some(environment.clone()),
+                    environment.clone(),
                     languages.clone(),
                     Some(client.http_client()),
                     fs.clone(),
-                    None,
-                    None,
-                    None,
                     cx,
                 )
             });
@@ -712,16 +709,89 @@ impl Project {
         fs: Arc<dyn Fs>,
         cx: &mut AppContext,
     ) -> Model<Self> {
-        let this = Self::local(client, node, user_store, languages, fs, None, cx);
-        this.update(cx, |this, cx| {
-            let client: AnyProtoClient = ssh.clone().into();
+        cx.new_model(|cx: &mut ModelContext<Self>| {
+            let (tx, rx) = mpsc::unbounded();
+            cx.spawn(move |this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
+                .detach();
+            let tasks = Inventory::new(cx);
+            let global_snippets_dir = paths::config_dir().join("snippets");
+            let snippets =
+                SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx);
+
+            let worktree_store = cx.new_model(|_| {
+                let mut worktree_store = WorktreeStore::new(false, fs.clone());
+                worktree_store.set_upstream_client(ssh.clone().into());
+                worktree_store
+            });
+            cx.subscribe(&worktree_store, Self::on_worktree_store_event)
+                .detach();
+
+            let buffer_store =
+                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx));
+            cx.subscribe(&buffer_store, Self::on_buffer_store_event)
+                .detach();
 
-            this.worktree_store.update(cx, |store, _cx| {
-                store.set_upstream_client(client.clone());
+            let settings_observer = cx.new_model(|cx| {
+                SettingsObserver::new_ssh(ssh.clone().into(), worktree_store.clone(), cx)
             });
-            this.settings_observer = cx.new_model(|cx| {
-                SettingsObserver::new_ssh(ssh.clone().into(), this.worktree_store.clone(), cx)
+
+            let environment = ProjectEnvironment::new(&worktree_store, None, cx);
+            let lsp_store = cx.new_model(|cx| {
+                LspStore::new_remote(
+                    buffer_store.clone(),
+                    worktree_store.clone(),
+                    languages.clone(),
+                    ssh.clone().into(),
+                    0,
+                    cx,
+                )
             });
+            cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
+
+            let this = Self {
+                buffer_ordered_messages_tx: tx,
+                collaborators: Default::default(),
+                worktree_store,
+                buffer_store,
+                lsp_store,
+                current_lsp_settings: ProjectSettings::get_global(cx).lsp.clone(),
+                join_project_response_message_id: 0,
+                client_state: ProjectClientState::Local,
+                client_subscriptions: Vec::new(),
+                _subscriptions: vec![
+                    cx.observe_global::<SettingsStore>(Self::on_settings_changed),
+                    cx.on_release(Self::release),
+                ],
+                active_entry: None,
+                snippets,
+                languages,
+                client,
+                user_store,
+                settings_observer,
+                fs,
+                ssh_session: Some(ssh.clone()),
+                buffers_needing_diff: Default::default(),
+                git_diff_debouncer: DebouncedDelay::new(),
+                terminals: Terminals {
+                    local_handles: Vec::new(),
+                },
+                node: Some(node),
+                default_prettier: DefaultPrettier::default(),
+                prettiers_per_worktree: HashMap::default(),
+                prettier_instances: HashMap::default(),
+                tasks,
+                hosted_project_id: None,
+                dev_server_project_id: None,
+                search_history: Self::new_search_history(),
+                environment,
+                remotely_created_buffers: Default::default(),
+                last_formatting_failure: None,
+                buffers_being_formatted: Default::default(),
+                search_included_history: Self::new_search_history(),
+                search_excluded_history: Self::new_search_history(),
+            };
+
+            let client: AnyProtoClient = ssh.clone().into();
 
             ssh.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
             ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.buffer_store);
@@ -735,9 +805,8 @@ impl Project {
             LspStore::init(&client);
             SettingsObserver::init(&client);
 
-            this.ssh_session = Some(ssh);
-        });
-        this
+            this
+        })
     }
 
     pub async fn remote(
@@ -820,16 +889,12 @@ impl Project {
             cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(remote_id), cx))?;
 
         let lsp_store = cx.new_model(|cx| {
-            let mut lsp_store = LspStore::new(
+            let mut lsp_store = LspStore::new_remote(
                 buffer_store.clone(),
                 worktree_store.clone(),
-                None,
                 languages.clone(),
-                Some(client.http_client()),
-                fs.clone(),
-                None,
-                Some(client.clone().into()),
-                Some(remote_id),
+                client.clone().into(),
+                remote_id,
                 cx,
             );
             lsp_store.set_language_server_statuses_from_proto(response.payload.language_servers);
@@ -4785,7 +4850,7 @@ impl Project {
     pub fn supplementary_language_servers<'a>(
         &'a self,
         cx: &'a AppContext,
-    ) -> impl '_ + Iterator<Item = (&'a LanguageServerId, &'a LanguageServerName)> {
+    ) -> impl '_ + Iterator<Item = (LanguageServerId, LanguageServerName)> {
         self.lsp_store.read(cx).supplementary_language_servers()
     }
 

crates/remote_server/src/headless_project.rs 🔗

@@ -57,18 +57,17 @@ impl HeadlessProject {
         });
         let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
         let lsp_store = cx.new_model(|cx| {
-            LspStore::new(
+            let mut lsp_store = LspStore::new_local(
                 buffer_store.clone(),
                 worktree_store.clone(),
-                Some(environment),
+                environment,
                 languages,
                 None,
                 fs.clone(),
-                Some(session.clone().into()),
-                None,
-                Some(0),
                 cx,
-            )
+            );
+            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
+            lsp_store
         });
 
         let client: AnyProtoClient = session.clone().into();