Handle LSP apply workspace edit request fully before responding

Max Brunsfeld created

Change summary

crates/editor/src/editor.rs   |   2 
crates/project/src/project.rs | 366 +++++++++++++++++-------------------
2 files changed, 172 insertions(+), 196 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -132,21 +132,6 @@ pub enum Event {
     CollaboratorLeft(PeerId),
 }
 
-enum LanguageServerEvent {
-    WorkStart {
-        token: String,
-    },
-    WorkProgress {
-        token: String,
-        progress: LanguageServerProgress,
-    },
-    WorkEnd {
-        token: String,
-    },
-    DiagnosticsUpdate(lsp::PublishDiagnosticsParams),
-    WorkspaceEdit(lsp::ApplyWorkspaceEditParams),
-}
-
 pub struct LanguageServerStatus {
     pub name: String,
     pub pending_work: BTreeMap<String, LanguageServerProgress>,
@@ -1330,17 +1315,30 @@ impl Project {
                 );
                 cx.spawn_weak(|this, mut cx| async move {
                     let language_server = language_server?.await.log_err()?;
+                    let language_server = language_server
+                        .initialize(adapter.initialization_options())
+                        .await
+                        .log_err()?;
                     let this = this.upgrade(&cx)?;
-                    let (language_server_events_tx, language_server_events_rx) =
-                        smol::channel::unbounded();
+                    let disk_based_diagnostics_progress_token =
+                        adapter.disk_based_diagnostics_progress_token();
 
                     language_server
                         .on_notification::<lsp::notification::PublishDiagnostics, _>({
-                            let language_server_events_tx = language_server_events_tx.clone();
-                            move |params, _| {
-                                language_server_events_tx
-                                    .try_send(LanguageServerEvent::DiagnosticsUpdate(params))
-                                    .ok();
+                            let this = this.downgrade();
+                            let adapter = adapter.clone();
+                            move |params, mut cx| {
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |this, cx| {
+                                        this.on_lsp_diagnostics_published(
+                                            server_id,
+                                            params,
+                                            &adapter,
+                                            disk_based_diagnostics_progress_token,
+                                            cx,
+                                        );
+                                    });
+                                }
                             }
                         })
                         .detach();
@@ -1373,94 +1371,40 @@ impl Project {
 
                     language_server
                         .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
-                            let language_server_events_tx = language_server_events_tx.clone();
-                            move |params, _| {
-                                language_server_events_tx
-                                    .try_send(LanguageServerEvent::WorkspaceEdit(params))
-                                    .ok();
-                                async move {
-                                    Ok(lsp::ApplyWorkspaceEditResponse {
-                                        applied: true,
-                                        failed_change: None,
-                                        failure_reason: None,
-                                    })
-                                }
+                            let this = this.downgrade();
+                            let adapter = adapter.clone();
+                            let language_server = language_server.clone();
+                            move |params, cx| {
+                                Self::on_lsp_workspace_edit(
+                                    this,
+                                    params,
+                                    server_id,
+                                    adapter.clone(),
+                                    language_server.clone(),
+                                    cx,
+                                )
                             }
                         })
                         .detach();
 
                     language_server
-                        .on_notification::<lsp::notification::Progress, _>(move |params, _| {
-                            let token = match params.token {
-                                lsp::NumberOrString::String(token) => token,
-                                lsp::NumberOrString::Number(token) => {
-                                    log::info!("skipping numeric progress token {}", token);
-                                    return;
+                        .on_notification::<lsp::notification::Progress, _>({
+                            let this = this.downgrade();
+                            move |params, mut cx| {
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |this, cx| {
+                                        this.on_lsp_progress(
+                                            params,
+                                            server_id,
+                                            disk_based_diagnostics_progress_token,
+                                            cx,
+                                        );
+                                    });
                                 }
-                            };
-
-                            match params.value {
-                                lsp::ProgressParamsValue::WorkDone(progress) => match progress {
-                                    lsp::WorkDoneProgress::Begin(_) => {
-                                        language_server_events_tx
-                                            .try_send(LanguageServerEvent::WorkStart { token })
-                                            .ok();
-                                    }
-                                    lsp::WorkDoneProgress::Report(report) => {
-                                        language_server_events_tx
-                                            .try_send(LanguageServerEvent::WorkProgress {
-                                                token,
-                                                progress: LanguageServerProgress {
-                                                    message: report.message,
-                                                    percentage: report
-                                                        .percentage
-                                                        .map(|p| p as usize),
-                                                    last_update_at: Instant::now(),
-                                                },
-                                            })
-                                            .ok();
-                                    }
-                                    lsp::WorkDoneProgress::End(_) => {
-                                        language_server_events_tx
-                                            .try_send(LanguageServerEvent::WorkEnd { token })
-                                            .ok();
-                                    }
-                                },
                             }
                         })
                         .detach();
 
-                    let language_server = language_server
-                        .initialize(adapter.initialization_options())
-                        .await
-                        .log_err()?;
-
-                    // Process all the LSP events.
-                    cx.spawn(|mut cx| {
-                        let this = this.downgrade();
-                        let adapter = adapter.clone();
-                        let language_server = language_server.clone();
-                        async move {
-                            while let Ok(event) = language_server_events_rx.recv().await {
-                                let this = this.upgrade(&cx)?;
-                                Self::on_lsp_event(
-                                    this,
-                                    server_id,
-                                    &adapter,
-                                    &language_server,
-                                    event,
-                                    &mut cx,
-                                )
-                                .await;
-
-                                // Don't starve the main thread when lots of events arrive all at once.
-                                smol::future::yield_now().await;
-                            }
-                            Some(())
-                        }
-                    })
-                    .detach();
-
                     this.update(&mut cx, |this, cx| {
                         this.language_servers
                             .insert(key.clone(), (adapter, language_server.clone()));
@@ -1615,75 +1559,111 @@ impl Project {
         .detach();
     }
 
-    async fn on_lsp_event(
-        this: ModelHandle<Self>,
-        language_server_id: usize,
+    fn on_lsp_diagnostics_published(
+        &mut self,
+        server_id: usize,
+        mut params: lsp::PublishDiagnosticsParams,
         adapter: &Arc<dyn LspAdapter>,
-        language_server: &Arc<LanguageServer>,
-        event: LanguageServerEvent,
-        cx: &mut AsyncAppContext,
+        disk_based_diagnostics_progress_token: Option<&str>,
+        cx: &mut ModelContext<Self>,
     ) {
-        let disk_based_diagnostics_progress_token = adapter.disk_based_diagnostics_progress_token();
-        match event {
-            LanguageServerEvent::WorkStart { token } => {
-                this.update(cx, |this, cx| {
-                    let language_server_status = if let Some(status) =
-                        this.language_server_statuses.get_mut(&language_server_id)
-                    {
-                        status
-                    } else {
-                        return;
-                    };
+        adapter.process_diagnostics(&mut params);
+        if disk_based_diagnostics_progress_token.is_none() {
+            self.disk_based_diagnostics_started(cx);
+            self.broadcast_language_server_update(
+                server_id,
+                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
+                    proto::LspDiskBasedDiagnosticsUpdating {},
+                ),
+            );
+        }
+        self.update_diagnostics(params, adapter.disk_based_diagnostic_sources(), cx)
+            .log_err();
+        if disk_based_diagnostics_progress_token.is_none() {
+            self.disk_based_diagnostics_finished(cx);
+            self.broadcast_language_server_update(
+                server_id,
+                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+                    proto::LspDiskBasedDiagnosticsUpdated {},
+                ),
+            );
+        }
+    }
+
+    fn on_lsp_progress(
+        &mut self,
+        progress: lsp::ProgressParams,
+        server_id: usize,
+        disk_based_diagnostics_progress_token: Option<&str>,
+        cx: &mut ModelContext<Self>,
+    ) {
+        let token = match progress.token {
+            lsp::NumberOrString::String(token) => token,
+            lsp::NumberOrString::Number(token) => {
+                log::info!("skipping numeric progress token {}", token);
+                return;
+            }
+        };
+
+        match progress.value {
+            lsp::ProgressParamsValue::WorkDone(progress) => match progress {
+                lsp::WorkDoneProgress::Begin(_) => {
+                    let language_server_status =
+                        if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
+                            status
+                        } else {
+                            return;
+                        };
 
                     if Some(token.as_str()) == disk_based_diagnostics_progress_token {
                         language_server_status.pending_diagnostic_updates += 1;
                         if language_server_status.pending_diagnostic_updates == 1 {
-                            this.disk_based_diagnostics_started(cx);
-                            this.broadcast_language_server_update(
-                            language_server_id,
-                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
-                                proto::LspDiskBasedDiagnosticsUpdating {},
-                            ),
-                        );
+                            self.disk_based_diagnostics_started(cx);
+                            self.broadcast_language_server_update(
+                                                            server_id,
+                                                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
+                                                                proto::LspDiskBasedDiagnosticsUpdating {},
+                                                            ),
+                                                        );
                         }
                     } else {
-                        this.on_lsp_work_start(language_server_id, token.clone(), cx);
-                        this.broadcast_language_server_update(
-                            language_server_id,
+                        self.on_lsp_work_start(server_id, token.clone(), cx);
+                        self.broadcast_language_server_update(
+                            server_id,
                             proto::update_language_server::Variant::WorkStart(
                                 proto::LspWorkStart { token },
                             ),
                         );
                     }
-                });
-            }
-            LanguageServerEvent::WorkProgress { token, progress } => {
-                this.update(cx, |this, cx| {
+                }
+                lsp::WorkDoneProgress::Report(report) => {
                     if Some(token.as_str()) != disk_based_diagnostics_progress_token {
-                        this.on_lsp_work_progress(
-                            language_server_id,
+                        self.on_lsp_work_progress(
+                            server_id,
                             token.clone(),
-                            progress.clone(),
+                            LanguageServerProgress {
+                                message: report.message.clone(),
+                                percentage: report.percentage.map(|p| p as usize),
+                                last_update_at: Instant::now(),
+                            },
                             cx,
                         );
-                        this.broadcast_language_server_update(
-                            language_server_id,
+                        self.broadcast_language_server_update(
+                            server_id,
                             proto::update_language_server::Variant::WorkProgress(
                                 proto::LspWorkProgress {
                                     token,
-                                    message: progress.message,
-                                    percentage: progress.percentage.map(|p| p as u32),
+                                    message: report.message,
+                                    percentage: report.percentage.map(|p| p as u32),
                                 },
                             ),
                         );
                     }
-                });
-            }
-            LanguageServerEvent::WorkEnd { token } => {
-                this.update(cx, |this, cx| {
+                }
+                lsp::WorkDoneProgress::End(_) => {
                     if Some(token.as_str()) == disk_based_diagnostics_progress_token {
                         let language_server_status = if let Some(status) =
-                            this.language_server_statuses.get_mut(&language_server_id)
+                            self.language_server_statuses.get_mut(&server_id)
                         {
                             status
                         } else {
@@ -1692,69 +1672,25 @@ impl Project {
 
                         language_server_status.pending_diagnostic_updates -= 1;
                         if language_server_status.pending_diagnostic_updates == 0 {
-                            this.disk_based_diagnostics_finished(cx);
-                            this.broadcast_language_server_update(
-                                language_server_id,
+                            self.disk_based_diagnostics_finished(cx);
+                            self.broadcast_language_server_update(
+                                server_id,
                                 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
                                     proto::LspDiskBasedDiagnosticsUpdated {},
                                 ),
                             );
                         }
                     } else {
-                        this.on_lsp_work_end(language_server_id, token.clone(), cx);
-                        this.broadcast_language_server_update(
-                            language_server_id,
+                        self.on_lsp_work_end(server_id, token.clone(), cx);
+                        self.broadcast_language_server_update(
+                            server_id,
                             proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
                                 token,
                             }),
                         );
                     }
-                });
-            }
-            LanguageServerEvent::DiagnosticsUpdate(mut params) => {
-                this.update(cx, |this, cx| {
-                    adapter.process_diagnostics(&mut params);
-
-                    if disk_based_diagnostics_progress_token.is_none() {
-                        this.disk_based_diagnostics_started(cx);
-                        this.broadcast_language_server_update(
-                            language_server_id,
-                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
-                                proto::LspDiskBasedDiagnosticsUpdating {},
-                            ),
-                        );
-                    }
-                    this.update_diagnostics(params, adapter.disk_based_diagnostic_sources(), cx)
-                        .log_err();
-                    if disk_based_diagnostics_progress_token.is_none() {
-                        this.disk_based_diagnostics_finished(cx);
-                        this.broadcast_language_server_update(
-                            language_server_id,
-                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
-                                proto::LspDiskBasedDiagnosticsUpdated {},
-                            ),
-                        );
-                    }
-                });
-            }
-            LanguageServerEvent::WorkspaceEdit(params) => {
-                let transaction = Self::deserialize_workspace_edit(
-                    this.clone(),
-                    params.edit,
-                    true,
-                    adapter.clone(),
-                    language_server.clone(),
-                    cx,
-                )
-                .await
-                .log_err();
-                this.update(cx, |this, _| {
-                    if let Some(transaction) = transaction {
-                        this.last_workspace_edits_by_language_server
-                            .insert(language_server_id, transaction);
-                    }
-                });
-            }
+                }
+            },
         }
     }
 
@@ -1802,6 +1738,40 @@ impl Project {
         }
     }
 
+    async fn on_lsp_workspace_edit(
+        this: WeakModelHandle<Self>,
+        params: lsp::ApplyWorkspaceEditParams,
+        server_id: usize,
+        adapter: Arc<dyn LspAdapter>,
+        language_server: Arc<LanguageServer>,
+        mut cx: AsyncAppContext,
+    ) -> Result<lsp::ApplyWorkspaceEditResponse> {
+        let this = this
+            .upgrade(&cx)
+            .ok_or_else(|| anyhow!("project project closed"))?;
+        let transaction = Self::deserialize_workspace_edit(
+            this.clone(),
+            params.edit,
+            true,
+            adapter.clone(),
+            language_server.clone(),
+            &mut cx,
+        )
+        .await
+        .log_err();
+        this.update(&mut cx, |this, _| {
+            if let Some(transaction) = transaction {
+                this.last_workspace_edits_by_language_server
+                    .insert(server_id, transaction);
+            }
+        });
+        Ok(lsp::ApplyWorkspaceEditResponse {
+            applied: true,
+            failed_change: None,
+            failure_reason: None,
+        })
+    }
+
     fn broadcast_language_server_update(
         &self,
         language_server_id: usize,
@@ -2746,6 +2716,10 @@ impl Project {
                     )
                     .await
                 } else if let Some(command) = action.lsp_action.command {
+                    this.update(&mut cx, |this, _| {
+                        this.last_workspace_edits_by_language_server
+                            .remove(&lang_server.server_id());
+                    });
                     lang_server
                         .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
                             command: command.command,
@@ -6071,7 +6045,7 @@ mod tests {
         }
     }
 
-    #[gpui::test]
+    #[gpui::test(iterations = 100)]
     async fn test_apply_code_action(cx: &mut gpui::TestAppContext) {
         let mut language = Language::new(
             LanguageConfig {