Refactor language server startup

Julia and Max Brunsfeld created

Avoid parallel vecs

Co-Authored-By: Max Brunsfeld <max@zed.dev>

Change summary

crates/language/src/language.rs | 109 +++---
crates/project/src/project.rs   | 547 +++++++++++++++++-----------------
2 files changed, 324 insertions(+), 332 deletions(-)

Detailed changes

crates/language/src/language.rs 🔗

@@ -782,13 +782,14 @@ impl LanguageRegistry {
         self.state.read().languages.iter().cloned().collect()
     }
 
-    pub fn start_language_servers(
+    pub fn start_language_server(
         self: &Arc<Self>,
         language: Arc<Language>,
+        adapter: Arc<CachedLspAdapter>,
         root_path: Arc<Path>,
         http_client: Arc<dyn HttpClient>,
         cx: &mut AppContext,
-    ) -> Vec<PendingLanguageServer> {
+    ) -> Option<PendingLanguageServer> {
         #[cfg(any(test, feature = "test-support"))]
         if language.fake_adapter.is_some() {
             let task = cx.spawn(|cx| async move {
@@ -819,70 +820,60 @@ impl LanguageRegistry {
             });
 
             let server_id = post_inc(&mut self.state.write().next_language_server_id);
-            return vec![PendingLanguageServer { server_id, task }];
+            return Some(PendingLanguageServer { server_id, task });
         }
 
         let download_dir = self
             .language_server_download_dir
             .clone()
             .ok_or_else(|| anyhow!("language server download directory has not been assigned"))
-            .log_err();
-        let download_dir = match download_dir {
-            Some(download_dir) => download_dir,
-            None => return Vec::new(),
-        };
-
-        let mut results = Vec::new();
-
-        for adapter in &language.adapters {
-            let this = self.clone();
-            let language = language.clone();
-            let http_client = http_client.clone();
-            let download_dir = download_dir.clone();
-            let root_path = root_path.clone();
-            let adapter = adapter.clone();
-            let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone();
-            let login_shell_env_loaded = self.login_shell_env_loaded.clone();
-            let server_id = post_inc(&mut self.state.write().next_language_server_id);
-
-            let task = cx.spawn(|cx| async move {
-                login_shell_env_loaded.await;
-
-                let mut lock = this.lsp_binary_paths.lock();
-                let entry = lock
-                    .entry(adapter.name.clone())
-                    .or_insert_with(|| {
-                        get_binary(
-                            adapter.clone(),
-                            language.clone(),
-                            http_client,
-                            download_dir,
-                            lsp_binary_statuses,
-                        )
-                        .map_err(Arc::new)
-                        .boxed()
-                        .shared()
-                    })
-                    .clone();
-                drop(lock);
-                let binary = entry.clone().map_err(|e| anyhow!(e)).await?;
-
-                let server = lsp::LanguageServer::new(
-                    server_id,
-                    &binary.path,
-                    &binary.arguments,
-                    &root_path,
-                    adapter.code_action_kinds(),
-                    cx,
-                )?;
-
-                Ok(server)
-            });
-
-            results.push(PendingLanguageServer { server_id, task });
-        }
+            .log_err()?;
+
+        let this = self.clone();
+        let language = language.clone();
+        let http_client = http_client.clone();
+        let download_dir = download_dir.clone();
+        let root_path = root_path.clone();
+        let adapter = adapter.clone();
+        let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone();
+        let login_shell_env_loaded = self.login_shell_env_loaded.clone();
+        let server_id = post_inc(&mut self.state.write().next_language_server_id);
+
+        let task = cx.spawn(|cx| async move {
+            login_shell_env_loaded.await;
+
+            let mut lock = this.lsp_binary_paths.lock();
+            let entry = lock
+                .entry(adapter.name.clone())
+                .or_insert_with(|| {
+                    get_binary(
+                        adapter.clone(),
+                        language.clone(),
+                        http_client,
+                        download_dir,
+                        lsp_binary_statuses,
+                    )
+                    .map_err(Arc::new)
+                    .boxed()
+                    .shared()
+                })
+                .clone();
+            drop(lock);
+            let binary = entry.clone().map_err(|e| anyhow!(e)).await?;
+
+            let server = lsp::LanguageServer::new(
+                server_id,
+                &binary.path,
+                &binary.arguments,
+                &root_path,
+                adapter.code_action_kinds(),
+                cx,
+            )?;
+
+            Ok(server)
+        });
 
-        results
+        Some(PendingLanguageServer { server_id, task })
     }
 
     pub fn language_server_binary_statuses(

crates/project/src/project.rs 🔗

@@ -2137,17 +2137,23 @@ impl Project {
             return;
         }
 
-        let adapters = language.lsp_adapters();
-        let language_servers = self.languages.start_language_servers(
-            language.clone(),
-            worktree_path.clone(),
-            self.client.http_client(),
-            cx,
-        );
-        debug_assert_eq!(adapters.len(), language_servers.len());
-
-        for (adapter, pending_server) in adapters.into_iter().zip(language_servers.into_iter()) {
+        for adapter in language.lsp_adapters() {
             let key = (worktree_id, adapter.name.clone());
+            if self.language_server_ids.contains_key(&key) {
+                continue;
+            }
+
+            let pending_server = match self.languages.start_language_server(
+                language.clone(),
+                adapter.clone(),
+                worktree_path.clone(),
+                self.client.http_client(),
+                cx,
+            ) {
+                Some(pending_server) => pending_server,
+                None => continue,
+            };
+
             let lsp = &cx.global::<Settings>().lsp.get(&adapter.name.0);
             let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
 
@@ -2160,17 +2166,17 @@ impl Project {
                 _ => {}
             }
 
-            if !self.language_server_ids.contains_key(&key) {
-                let adapter = self.setup_pending_language_server(
-                    initialization_options,
-                    pending_server,
-                    adapter.clone(),
-                    language.clone(),
-                    key.clone(),
-                    cx,
-                );
-                self.language_server_ids.insert(key.clone(), adapter);
-            }
+            let server_id = pending_server.server_id;
+            let state = self.setup_pending_language_server(
+                initialization_options,
+                pending_server,
+                adapter.clone(),
+                language.clone(),
+                key.clone(),
+                cx,
+            );
+            self.language_servers.insert(server_id, state);
+            self.language_server_ids.insert(key.clone(), server_id);
         }
     }
 
@@ -2182,286 +2188,281 @@ impl Project {
         language: Arc<Language>,
         key: (WorktreeId, LanguageServerName),
         cx: &mut ModelContext<Project>,
-    ) -> usize {
+    ) -> LanguageServerState {
         let server_id = pending_server.server_id;
         let languages = self.languages.clone();
 
-        self.language_servers.insert(
-            server_id,
-            LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
-                let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
-                let language_server = pending_server.task.await.log_err()?;
-                let language_server = language_server
-                    .initialize(initialization_options)
-                    .await
-                    .log_err()?;
-                let this = this.upgrade(&cx)?;
+        LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
+            let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
+            let language_server = pending_server.task.await.log_err()?;
+            let language_server = language_server
+                .initialize(initialization_options)
+                .await
+                .log_err()?;
+            let this = this.upgrade(&cx)?;
 
-                language_server
-                    .on_notification::<lsp::notification::PublishDiagnostics, _>({
-                        let this = this.downgrade();
+            language_server
+                .on_notification::<lsp::notification::PublishDiagnostics, _>({
+                    let this = this.downgrade();
+                    let adapter = adapter.clone();
+                    move |mut params, cx| {
+                        let this = this;
                         let adapter = adapter.clone();
-                        move |mut params, cx| {
-                            let this = this;
-                            let adapter = adapter.clone();
-                            cx.spawn(|mut cx| async move {
-                                adapter.process_diagnostics(&mut params).await;
-                                if let Some(this) = this.upgrade(&cx) {
-                                    this.update(&mut cx, |this, cx| {
-                                        this.update_diagnostics(
-                                            server_id,
-                                            params,
-                                            &adapter.disk_based_diagnostic_sources,
-                                            cx,
-                                        )
-                                        .log_err();
-                                    });
-                                }
-                            })
-                            .detach();
-                        }
-                    })
-                    .detach();
+                        cx.spawn(|mut cx| async move {
+                            adapter.process_diagnostics(&mut params).await;
+                            if let Some(this) = this.upgrade(&cx) {
+                                this.update(&mut cx, |this, cx| {
+                                    this.update_diagnostics(
+                                        server_id,
+                                        params,
+                                        &adapter.disk_based_diagnostic_sources,
+                                        cx,
+                                    )
+                                    .log_err();
+                                });
+                            }
+                        })
+                        .detach();
+                    }
+                })
+                .detach();
 
-                language_server
-                    .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
+            language_server
+                .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
+                    let languages = languages.clone();
+                    move |params, mut cx| {
                         let languages = languages.clone();
-                        move |params, mut cx| {
-                            let languages = languages.clone();
-                            async move {
-                                dbg!(&params.items);
-                                let workspace_config =
-                                    cx.update(|cx| languages.workspace_configuration(cx)).await;
-                                Ok(params
-                                    .items
-                                    .into_iter()
-                                    .map(|item| {
-                                        if let Some(section) = &item.section {
-                                            workspace_config
-                                                .get(section)
-                                                .cloned()
-                                                .unwrap_or(serde_json::Value::Null)
-                                        } else {
-                                            workspace_config.clone()
-                                        }
-                                    })
-                                    .collect())
-                            }
+                        async move {
+                            let workspace_config =
+                                cx.update(|cx| languages.workspace_configuration(cx)).await;
+                            Ok(params
+                                .items
+                                .into_iter()
+                                .map(|item| {
+                                    if let Some(section) = &item.section {
+                                        workspace_config
+                                            .get(section)
+                                            .cloned()
+                                            .unwrap_or(serde_json::Value::Null)
+                                    } else {
+                                        workspace_config.clone()
+                                    }
+                                })
+                                .collect())
                         }
-                    })
-                    .detach();
+                    }
+                })
+                .detach();
 
-                // Even though we don't have handling for these requests, respond to them to
-                // avoid stalling any language server like `gopls` which waits for a response
-                // to these requests when initializing.
-                language_server
-                    .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
-                        let this = this.downgrade();
-                        move |params, mut cx| async move {
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, _| {
-                                    if let Some(status) =
-                                        this.language_server_statuses.get_mut(&server_id)
-                                    {
-                                        if let lsp::NumberOrString::String(token) = params.token {
-                                            status.progress_tokens.insert(token);
-                                        }
+            // Even though we don't have handling for these requests, respond to them to
+            // avoid stalling any language server like `gopls` which waits for a response
+            // to these requests when initializing.
+            language_server
+                .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
+                    let this = this.downgrade();
+                    move |params, mut cx| async move {
+                        if let Some(this) = this.upgrade(&cx) {
+                            this.update(&mut cx, |this, _| {
+                                if let Some(status) =
+                                    this.language_server_statuses.get_mut(&server_id)
+                                {
+                                    if let lsp::NumberOrString::String(token) = params.token {
+                                        status.progress_tokens.insert(token);
                                     }
-                                });
-                            }
-                            Ok(())
+                                }
+                            });
                         }
-                    })
-                    .detach();
-                language_server
-                    .on_request::<lsp::request::RegisterCapability, _, _>({
-                        let this = this.downgrade();
-                        move |params, mut cx| async move {
-                            let this = this
-                                .upgrade(&cx)
-                                .ok_or_else(|| anyhow!("project dropped"))?;
-                            for reg in params.registrations {
-                                if reg.method == "workspace/didChangeWatchedFiles" {
-                                    if let Some(options) = reg.register_options {
-                                        let options = serde_json::from_value(options)?;
-                                        this.update(&mut cx, |this, cx| {
-                                            this.on_lsp_did_change_watched_files(
-                                                server_id, options, cx,
-                                            );
-                                        });
-                                    }
+                        Ok(())
+                    }
+                })
+                .detach();
+            language_server
+                .on_request::<lsp::request::RegisterCapability, _, _>({
+                    let this = this.downgrade();
+                    move |params, mut cx| async move {
+                        let this = this
+                            .upgrade(&cx)
+                            .ok_or_else(|| anyhow!("project dropped"))?;
+                        for reg in params.registrations {
+                            if reg.method == "workspace/didChangeWatchedFiles" {
+                                if let Some(options) = reg.register_options {
+                                    let options = serde_json::from_value(options)?;
+                                    this.update(&mut cx, |this, cx| {
+                                        this.on_lsp_did_change_watched_files(
+                                            server_id, options, cx,
+                                        );
+                                    });
                                 }
                             }
-                            Ok(())
                         }
-                    })
-                    .detach();
+                        Ok(())
+                    }
+                })
+                .detach();
 
-                language_server
-                    .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
-                        let this = this.downgrade();
-                        let adapter = adapter.clone();
-                        let language_server = language_server.clone();
-                        move |params, cx| {
-                            Self::on_lsp_workspace_edit(
-                                this,
-                                params,
-                                server_id,
-                                adapter.clone(),
-                                language_server.clone(),
-                                cx,
-                            )
-                        }
-                    })
-                    .detach();
+            language_server
+                .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
+                    let this = this.downgrade();
+                    let adapter = adapter.clone();
+                    let language_server = language_server.clone();
+                    move |params, cx| {
+                        Self::on_lsp_workspace_edit(
+                            this,
+                            params,
+                            server_id,
+                            adapter.clone(),
+                            language_server.clone(),
+                            cx,
+                        )
+                    }
+                })
+                .detach();
 
-                let disk_based_diagnostics_progress_token =
-                    adapter.disk_based_diagnostics_progress_token.clone();
+            let disk_based_diagnostics_progress_token =
+                adapter.disk_based_diagnostics_progress_token.clone();
 
-                language_server
-                    .on_notification::<lsp::notification::Progress, _>({
-                        let this = this.downgrade();
-                        move |params, mut cx| {
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| {
-                                    this.on_lsp_progress(
-                                        params,
-                                        server_id,
-                                        disk_based_diagnostics_progress_token.clone(),
-                                        cx,
-                                    );
-                                });
-                            }
+            language_server
+                .on_notification::<lsp::notification::Progress, _>({
+                    let this = this.downgrade();
+                    move |params, mut cx| {
+                        if let Some(this) = this.upgrade(&cx) {
+                            this.update(&mut cx, |this, cx| {
+                                this.on_lsp_progress(
+                                    params,
+                                    server_id,
+                                    disk_based_diagnostics_progress_token.clone(),
+                                    cx,
+                                );
+                            });
                         }
-                    })
-                    .detach();
-
-                language_server
-                    .notify::<lsp::notification::DidChangeConfiguration>(
-                        lsp::DidChangeConfigurationParams {
-                            settings: workspace_config,
-                        },
-                    )
-                    .ok();
-
-                this.update(&mut cx, |this, cx| {
-                    // If the language server for this key doesn't match the server id, don't store the
-                    // server. Which will cause it to be dropped, killing the process
-                    if this
-                        .language_server_ids
-                        .get(&key)
-                        .map(|id| id != &server_id)
-                        .unwrap_or(false)
-                    {
-                        return None;
                     }
+                })
+                .detach();
 
-                    // Update language_servers collection with Running variant of LanguageServerState
-                    // indicating that the server is up and running and ready
-                    this.language_servers.insert(
-                        server_id,
-                        LanguageServerState::Running {
-                            adapter: adapter.clone(),
-                            language: language.clone(),
-                            watched_paths: Default::default(),
-                            server: language_server.clone(),
-                            simulate_disk_based_diagnostics_completion: None,
-                        },
-                    );
-                    this.language_server_statuses.insert(
-                        server_id,
-                        LanguageServerStatus {
-                            name: language_server.name().to_string(),
-                            pending_work: Default::default(),
-                            has_pending_diagnostic_updates: false,
-                            progress_tokens: Default::default(),
-                        },
-                    );
+            language_server
+                .notify::<lsp::notification::DidChangeConfiguration>(
+                    lsp::DidChangeConfigurationParams {
+                        settings: workspace_config,
+                    },
+                )
+                .ok();
 
-                    if let Some(project_id) = this.remote_id() {
-                        this.client
-                            .send(proto::StartLanguageServer {
-                                project_id,
-                                server: Some(proto::LanguageServer {
-                                    id: server_id as u64,
-                                    name: language_server.name().to_string(),
-                                }),
-                            })
-                            .log_err();
-                    }
+            this.update(&mut cx, |this, cx| {
+                // If the language server for this key doesn't match the server id, don't store the
+                // server. Which will cause it to be dropped, killing the process
+                if this
+                    .language_server_ids
+                    .get(&key)
+                    .map(|id| id != &server_id)
+                    .unwrap_or(false)
+                {
+                    return None;
+                }
 
-                    // Tell the language server about every open buffer in the worktree that matches the language.
-                    for buffer in this.opened_buffers.values() {
-                        if let Some(buffer_handle) = buffer.upgrade(cx) {
-                            let buffer = buffer_handle.read(cx);
-                            let file = match File::from_dyn(buffer.file()) {
-                                Some(file) => file,
-                                None => continue,
-                            };
-                            let language = match buffer.language() {
-                                Some(language) => language,
-                                None => continue,
-                            };
+                // Update language_servers collection with Running variant of LanguageServerState
+                // indicating that the server is up and running and ready
+                this.language_servers.insert(
+                    server_id,
+                    LanguageServerState::Running {
+                        adapter: adapter.clone(),
+                        language: language.clone(),
+                        watched_paths: Default::default(),
+                        server: language_server.clone(),
+                        simulate_disk_based_diagnostics_completion: None,
+                    },
+                );
+                this.language_server_statuses.insert(
+                    server_id,
+                    LanguageServerStatus {
+                        name: language_server.name().to_string(),
+                        pending_work: Default::default(),
+                        has_pending_diagnostic_updates: false,
+                        progress_tokens: Default::default(),
+                    },
+                );
 
-                            if file.worktree.read(cx).id() != key.0
-                                || !language.lsp_adapters().iter().any(|a| a.name == key.1)
-                            {
-                                continue;
-                            }
+                if let Some(project_id) = this.remote_id() {
+                    this.client
+                        .send(proto::StartLanguageServer {
+                            project_id,
+                            server: Some(proto::LanguageServer {
+                                id: server_id as u64,
+                                name: language_server.name().to_string(),
+                            }),
+                        })
+                        .log_err();
+                }
 
-                            let file = file.as_local()?;
-                            let versions = this
-                                .buffer_snapshots
-                                .entry(buffer.remote_id())
-                                .or_default()
-                                .entry(server_id)
-                                .or_insert_with(|| {
-                                    vec![LspBufferSnapshot {
-                                        version: 0,
-                                        snapshot: buffer.text_snapshot(),
-                                    }]
-                                });
+                // Tell the language server about every open buffer in the worktree that matches the language.
+                for buffer in this.opened_buffers.values() {
+                    if let Some(buffer_handle) = buffer.upgrade(cx) {
+                        let buffer = buffer_handle.read(cx);
+                        let file = match File::from_dyn(buffer.file()) {
+                            Some(file) => file,
+                            None => continue,
+                        };
+                        let language = match buffer.language() {
+                            Some(language) => language,
+                            None => continue,
+                        };
 
-                            let snapshot = versions.last().unwrap();
-                            let version = snapshot.version;
-                            let initial_snapshot = &snapshot.snapshot;
-                            let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
-                            language_server
-                                .notify::<lsp::notification::DidOpenTextDocument>(
-                                    lsp::DidOpenTextDocumentParams {
-                                        text_document: lsp::TextDocumentItem::new(
-                                            uri,
-                                            adapter
-                                                .language_ids
-                                                .get(language.name().as_ref())
-                                                .cloned()
-                                                .unwrap_or_default(),
-                                            version,
-                                            initial_snapshot.text(),
-                                        ),
-                                    },
-                                )
-                                .log_err()?;
-                            buffer_handle.update(cx, |buffer, cx| {
-                                buffer.set_completion_triggers(
-                                    language_server
-                                        .capabilities()
-                                        .completion_provider
-                                        .as_ref()
-                                        .and_then(|provider| provider.trigger_characters.clone())
-                                        .unwrap_or_default(),
-                                    cx,
-                                )
-                            });
+                        if file.worktree.read(cx).id() != key.0
+                            || !language.lsp_adapters().iter().any(|a| a.name == key.1)
+                        {
+                            continue;
                         }
+
+                        let file = file.as_local()?;
+                        let versions = this
+                            .buffer_snapshots
+                            .entry(buffer.remote_id())
+                            .or_default()
+                            .entry(server_id)
+                            .or_insert_with(|| {
+                                vec![LspBufferSnapshot {
+                                    version: 0,
+                                    snapshot: buffer.text_snapshot(),
+                                }]
+                            });
+
+                        let snapshot = versions.last().unwrap();
+                        let version = snapshot.version;
+                        let initial_snapshot = &snapshot.snapshot;
+                        let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
+                        language_server
+                            .notify::<lsp::notification::DidOpenTextDocument>(
+                                lsp::DidOpenTextDocumentParams {
+                                    text_document: lsp::TextDocumentItem::new(
+                                        uri,
+                                        adapter
+                                            .language_ids
+                                            .get(language.name().as_ref())
+                                            .cloned()
+                                            .unwrap_or_default(),
+                                        version,
+                                        initial_snapshot.text(),
+                                    ),
+                                },
+                            )
+                            .log_err()?;
+                        buffer_handle.update(cx, |buffer, cx| {
+                            buffer.set_completion_triggers(
+                                language_server
+                                    .capabilities()
+                                    .completion_provider
+                                    .as_ref()
+                                    .and_then(|provider| provider.trigger_characters.clone())
+                                    .unwrap_or_default(),
+                                cx,
+                            )
+                        });
                     }
+                }
 
-                    cx.notify();
-                    Some(language_server)
-                })
-            })),
-        );
-        server_id
+                cx.notify();
+                Some(language_server)
+            })
+        }))
     }
 
     // Returns a list of all of the worktrees which no longer have a language server and the root path