Get tests passing, centralize more diagnostic logic in Project

Max Brunsfeld created

Change summary

crates/diagnostics/src/diagnostics.rs |  18 +
crates/project/src/project.rs         | 266 +++++++++++++++-------------
crates/project/src/worktree.rs        |  45 +---
crates/server/src/rpc.rs              |  10 
4 files changed, 177 insertions(+), 162 deletions(-)

Detailed changes

crates/diagnostics/src/diagnostics.rs 🔗

@@ -680,7 +680,6 @@ mod tests {
     use editor::{display_map::BlockContext, DisplayPoint, EditorSnapshot};
     use gpui::TestAppContext;
     use language::{Diagnostic, DiagnosticEntry, DiagnosticSeverity, PointUtf16};
-    use project::worktree;
     use serde_json::json;
     use std::sync::Arc;
     use unindent::Unindent as _;
@@ -727,6 +726,7 @@ mod tests {
             })
             .await
             .unwrap();
+        let worktree_id = worktree.read_with(&cx, |tree, _| tree.id());
 
         // Create some diagnostics
         worktree.update(&mut cx, |worktree, cx| {
@@ -903,7 +903,13 @@ mod tests {
                     cx,
                 )
                 .unwrap();
-            cx.emit(worktree::Event::DiskBasedDiagnosticsUpdated);
+        });
+        project.update(&mut cx, |_, cx| {
+            cx.emit(project::Event::DiagnosticsUpdated(ProjectPath {
+                worktree_id,
+                path: Arc::from("/test/consts.rs".as_ref()),
+            }));
+            cx.emit(project::Event::DiskBasedDiagnosticsUpdated { worktree_id });
         });
 
         view.next_notification(&cx).await;
@@ -1017,7 +1023,13 @@ mod tests {
                     cx,
                 )
                 .unwrap();
-            cx.emit(worktree::Event::DiskBasedDiagnosticsUpdated);
+        });
+        project.update(&mut cx, |_, cx| {
+            cx.emit(project::Event::DiagnosticsUpdated(ProjectPath {
+                worktree_id,
+                path: Arc::from("/test/consts.rs".as_ref()),
+            }));
+            cx.emit(project::Event::DiskBasedDiagnosticsUpdated { worktree_id });
         });
 
         view.next_notification(&cx).await;

crates/project/src/project.rs 🔗

@@ -14,6 +14,7 @@ use gpui::{
 use language::{Buffer, DiagnosticEntry, Language, LanguageRegistry};
 use lsp::{DiagnosticSeverity, LanguageServer};
 use postage::{prelude::Stream, watch};
+use smol::block_on;
 use std::{
     path::{Path, PathBuf},
     sync::{atomic::AtomicBool, Arc},
@@ -34,7 +35,7 @@ pub struct Project {
     client_state: ProjectClientState,
     collaborators: HashMap<PeerId, Collaborator>,
     subscriptions: Vec<client::Subscription>,
-    pending_disk_based_diagnostics: isize,
+    language_servers_with_diagnostics_running: isize,
 }
 
 enum ProjectClientState {
@@ -58,7 +59,7 @@ pub struct Collaborator {
     pub replica_id: ReplicaId,
 }
 
-#[derive(Debug)]
+#[derive(Clone, Debug, PartialEq)]
 pub enum Event {
     ActiveEntryChanged(Option<ProjectEntry>),
     WorktreeRemoved(WorktreeId),
@@ -191,7 +192,7 @@ impl Project {
                 client,
                 user_store,
                 fs,
-                pending_disk_based_diagnostics: 0,
+                language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
             }
         })
@@ -283,7 +284,7 @@ impl Project {
                     remote_id,
                     replica_id,
                 },
-                pending_disk_based_diagnostics: 0,
+                language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
             };
             for worktree in worktrees {
@@ -473,7 +474,7 @@ impl Project {
             let (buffer, buffer_is_new) = buffer_task.await?;
             if buffer_is_new {
                 this.update(&mut cx, |this, cx| {
-                    this.buffer_added(worktree, buffer.clone(), cx)
+                    this.assign_language_to_buffer(worktree, buffer.clone(), cx)
                 });
             }
             Ok(buffer)
@@ -497,12 +498,14 @@ impl Project {
                         .save_buffer_as(buffer.clone(), path, cx)
                 })
                 .await?;
-            this.update(&mut cx, |this, cx| this.buffer_added(worktree, buffer, cx));
+            this.update(&mut cx, |this, cx| {
+                this.assign_language_to_buffer(worktree, buffer, cx)
+            });
             Ok(())
         })
     }
 
-    fn buffer_added(
+    fn assign_language_to_buffer(
         &mut self,
         worktree: ModelHandle<Worktree>,
         buffer: ModelHandle<Buffer>,
@@ -548,17 +551,16 @@ impl Project {
         worktree_path: &Path,
         cx: &mut ModelContext<Self>,
     ) -> Option<Arc<LanguageServer>> {
+        enum LspEvent {
+            DiagnosticsStart,
+            DiagnosticsUpdate(lsp::PublishDiagnosticsParams),
+            DiagnosticsFinish,
+        }
+
         let language_server = language
             .start_server(worktree_path, cx)
             .log_err()
             .flatten()?;
-
-        enum DiagnosticProgress {
-            Updating,
-            Publish(lsp::PublishDiagnosticsParams),
-            Updated,
-        }
-
         let disk_based_sources = language
             .disk_based_diagnostic_sources()
             .cloned()
@@ -569,22 +571,25 @@ impl Project {
             disk_based_diagnostics_progress_token.is_some();
         let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
 
+        // Listen for `PublishDiagnostics` notifications.
         language_server
             .on_notification::<lsp::notification::PublishDiagnostics, _>({
                 let diagnostics_tx = diagnostics_tx.clone();
                 move |params| {
                     if !has_disk_based_diagnostic_progress_token {
-                        smol::block_on(diagnostics_tx.send(DiagnosticProgress::Updating)).ok();
+                        block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
                     }
-                    smol::block_on(diagnostics_tx.send(DiagnosticProgress::Publish(params))).ok();
+                    block_on(diagnostics_tx.send(LspEvent::DiagnosticsUpdate(params))).ok();
                     if !has_disk_based_diagnostic_progress_token {
-                        smol::block_on(diagnostics_tx.send(DiagnosticProgress::Updated)).ok();
+                        block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
                     }
                 }
             })
             .detach();
 
-        let mut pending_disk_based_diagnostics: i32 = 0;
+        // Listen for `Progress` notifications. Send an event when the language server
+        // transitions between running jobs and not running any jobs.
+        let mut running_jobs_for_this_server: i32 = 0;
         language_server
             .on_notification::<lsp::notification::Progress, _>(move |params| {
                 let token = match params.token {
@@ -596,21 +601,15 @@ impl Project {
                     match params.value {
                         lsp::ProgressParamsValue::WorkDone(progress) => match progress {
                             lsp::WorkDoneProgress::Begin(_) => {
-                                if pending_disk_based_diagnostics == 0 {
-                                    smol::block_on(
-                                        diagnostics_tx.send(DiagnosticProgress::Updating),
-                                    )
-                                    .ok();
+                                running_jobs_for_this_server += 1;
+                                if running_jobs_for_this_server == 1 {
+                                    block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
                                 }
-                                pending_disk_based_diagnostics += 1;
                             }
                             lsp::WorkDoneProgress::End(_) => {
-                                pending_disk_based_diagnostics -= 1;
-                                if pending_disk_based_diagnostics == 0 {
-                                    smol::block_on(
-                                        diagnostics_tx.send(DiagnosticProgress::Updated),
-                                    )
-                                    .ok();
+                                running_jobs_for_this_server -= 1;
+                                if running_jobs_for_this_server == 0 {
+                                    block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
                                 }
                             }
                             _ => {}
@@ -620,42 +619,43 @@ impl Project {
             })
             .detach();
 
+        // Process all the LSP events.
         cx.spawn_weak(|this, mut cx| async move {
             while let Ok(message) = diagnostics_rx.recv().await {
                 let this = cx.read(|cx| this.upgrade(cx))?;
                 match message {
-                    DiagnosticProgress::Updating => {
-                        let project_id = this.update(&mut cx, |this, cx| {
-                            cx.emit(Event::DiskBasedDiagnosticsStarted);
-                            this.remote_id()
-                        });
-                        if let Some(project_id) = project_id {
-                            rpc.send(proto::DiskBasedDiagnosticsUpdating {
-                                project_id,
-                                worktree_id: worktree_id.to_proto(),
+                    LspEvent::DiagnosticsStart => {
+                        let send = this.update(&mut cx, |this, cx| {
+                            this.disk_based_diagnostics_started(worktree_id, cx);
+                            this.remote_id().map(|project_id| {
+                                rpc.send(proto::DiskBasedDiagnosticsUpdating {
+                                    project_id,
+                                    worktree_id: worktree_id.to_proto(),
+                                })
                             })
-                            .await
-                            .log_err();
+                        });
+                        if let Some(send) = send {
+                            send.await.log_err();
                         }
                     }
-                    DiagnosticProgress::Publish(params) => {
+                    LspEvent::DiagnosticsUpdate(params) => {
                         this.update(&mut cx, |this, cx| {
                             this.update_diagnostics(params, &disk_based_sources, cx)
                                 .log_err();
                         });
                     }
-                    DiagnosticProgress::Updated => {
-                        let project_id = this.update(&mut cx, |this, cx| {
-                            cx.emit(Event::DiskBasedDiagnosticsFinished);
-                            this.remote_id()
-                        });
-                        if let Some(project_id) = project_id {
-                            rpc.send(proto::DiskBasedDiagnosticsUpdated {
-                                project_id,
-                                worktree_id: worktree_id.to_proto(),
+                    LspEvent::DiagnosticsFinish => {
+                        let send = this.update(&mut cx, |this, cx| {
+                            this.disk_based_diagnostics_finished(worktree_id, cx);
+                            this.remote_id().map(|project_id| {
+                                rpc.send(proto::DiskBasedDiagnosticsUpdated {
+                                    project_id,
+                                    worktree_id: worktree_id.to_proto(),
+                                })
                             })
-                            .await
-                            .log_err();
+                        });
+                        if let Some(send) = send {
+                            send.await.log_err();
                         }
                     }
                 }
@@ -682,17 +682,24 @@ impl Project {
                 path.strip_prefix(tree.as_local()?.abs_path()).ok()
             });
             if let Some(relative_path) = relative_path {
-                return tree.update(cx, |tree, cx| {
+                let worktree_id = tree.read(cx).id();
+                let project_path = ProjectPath {
+                    worktree_id,
+                    path: relative_path.into(),
+                };
+                tree.update(cx, |tree, cx| {
                     tree.as_local_mut().unwrap().update_diagnostics(
-                        relative_path.into(),
+                        project_path.path.clone(),
                         diagnostics,
                         disk_based_sources,
                         cx,
                     )
-                });
+                })?;
+                cx.emit(Event::DiagnosticsUpdated(project_path));
+                break;
             }
         }
-        todo!()
+        Ok(())
     }
 
     pub fn worktree_for_abs_path(
@@ -769,32 +776,6 @@ impl Project {
 
     fn add_worktree(&mut self, worktree: ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
         cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
-        cx.subscribe(&worktree, move |this, worktree, event, cx| match event {
-            worktree::Event::DiagnosticsUpdated(path) => {
-                cx.emit(Event::DiagnosticsUpdated(ProjectPath {
-                    worktree_id: worktree.read(cx).id(),
-                    path: path.clone(),
-                }));
-            }
-            worktree::Event::DiskBasedDiagnosticsUpdating => {
-                if this.pending_disk_based_diagnostics == 0 {
-                    cx.emit(Event::DiskBasedDiagnosticsStarted);
-                }
-                this.pending_disk_based_diagnostics += 1;
-            }
-            worktree::Event::DiskBasedDiagnosticsUpdated => {
-                this.pending_disk_based_diagnostics -= 1;
-                cx.emit(Event::DiskBasedDiagnosticsUpdated {
-                    worktree_id: worktree.read(cx).id(),
-                });
-                if this.pending_disk_based_diagnostics == 0 {
-                    if this.pending_disk_based_diagnostics == 0 {
-                        cx.emit(Event::DiskBasedDiagnosticsFinished);
-                    }
-                }
-            }
-        })
-        .detach();
         self.worktrees.push(worktree);
         cx.notify();
     }
@@ -823,7 +804,7 @@ impl Project {
     }
 
     pub fn is_running_disk_based_diagnostics(&self) -> bool {
-        self.pending_disk_based_diagnostics > 0
+        self.language_servers_with_diagnostics_running > 0
     }
 
     pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
@@ -850,6 +831,25 @@ impl Project {
         })
     }
 
+    fn disk_based_diagnostics_started(&mut self, _: WorktreeId, cx: &mut ModelContext<Self>) {
+        self.language_servers_with_diagnostics_running += 1;
+        if self.language_servers_with_diagnostics_running == 1 {
+            cx.emit(Event::DiskBasedDiagnosticsStarted);
+        }
+    }
+
+    fn disk_based_diagnostics_finished(
+        &mut self,
+        worktree_id: WorktreeId,
+        cx: &mut ModelContext<Self>,
+    ) {
+        cx.emit(Event::DiskBasedDiagnosticsUpdated { worktree_id });
+        self.language_servers_with_diagnostics_running -= 1;
+        if self.language_servers_with_diagnostics_running == 0 {
+            cx.emit(Event::DiskBasedDiagnosticsFinished);
+        }
+    }
+
     pub fn active_entry(&self) -> Option<ProjectEntry> {
         self.active_entry
     }
@@ -991,12 +991,19 @@ impl Project {
     ) -> Result<()> {
         let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
         if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree
-                    .as_remote_mut()
-                    .unwrap()
-                    .update_diagnostic_summary(envelope, cx);
-            });
+            if let Some(summary) = envelope.payload.summary {
+                let project_path = ProjectPath {
+                    worktree_id,
+                    path: Path::new(&summary.path).into(),
+                };
+                worktree.update(cx, |worktree, _| {
+                    worktree
+                        .as_remote_mut()
+                        .unwrap()
+                        .update_diagnostic_summary(project_path.path.clone(), &summary);
+                });
+                cx.emit(Event::DiagnosticsUpdated(project_path));
+            }
         }
         Ok(())
     }
@@ -1007,15 +1014,10 @@ impl Project {
         _: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree
-                    .as_remote()
-                    .unwrap()
-                    .disk_based_diagnostics_updating(cx);
-            });
-        }
+        self.disk_based_diagnostics_started(
+            WorktreeId::from_proto(envelope.payload.worktree_id),
+            cx,
+        );
         Ok(())
     }
 
@@ -1025,15 +1027,10 @@ impl Project {
         _: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree
-                    .as_remote()
-                    .unwrap()
-                    .disk_based_diagnostics_updated(cx);
-            });
-        }
+        self.disk_based_diagnostics_finished(
+            WorktreeId::from_proto(envelope.payload.worktree_id),
+            cx,
+        );
         Ok(())
     }
 
@@ -1318,7 +1315,7 @@ impl Collaborator {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use super::{Event, *};
     use client::test::FakeHttpClient;
     use fs::RealFs;
     use futures::StreamExt;
@@ -1402,6 +1399,7 @@ mod tests {
             .disk_based_diagnostics_progress_token
             .clone()
             .unwrap();
+
         let mut languages = LanguageRegistry::new();
         languages.add(Arc::new(Language::new(
             LanguageConfig {
@@ -1422,30 +1420,47 @@ mod tests {
         let client = Client::new(http_client.clone());
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
 
-        let tree = Worktree::open_local(
-            client,
-            user_store,
-            dir.path(),
-            Arc::new(RealFs),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
+        let project = cx.update(|cx| {
+            Project::local(
+                client,
+                user_store,
+                Arc::new(languages),
+                Arc::new(RealFs),
+                cx,
+            )
+        });
+
+        let tree = project
+            .update(&mut cx, |project, cx| {
+                project.add_local_worktree(dir.path(), cx)
+            })
+            .await
+            .unwrap();
+        let worktree_id = tree.read_with(&cx, |tree, _| tree.id());
+
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
 
         // Cause worktree to start the fake language server
-        let _buffer = tree
-            .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
+        let _buffer = project
+            .update(&mut cx, |project, cx| {
+                project.open_buffer(
+                    ProjectPath {
+                        worktree_id,
+                        path: Path::new("b.rs").into(),
+                    },
+                    cx,
+                )
+            })
             .await
             .unwrap();
 
-        let mut events = subscribe(&tree, &mut cx);
+        let mut events = subscribe(&project, &mut cx);
 
         fake_server.start_progress(&progress_token).await;
         assert_eq!(
             events.next().await.unwrap(),
-            Event::DiskBasedDiagnosticsUpdating
+            Event::DiskBasedDiagnosticsStarted
         );
 
         fake_server.start_progress(&progress_token).await;
@@ -1466,14 +1481,17 @@ mod tests {
             .await;
         assert_eq!(
             events.next().await.unwrap(),
-            Event::DiagnosticsUpdated(Arc::from(Path::new("a.rs")))
+            Event::DiagnosticsUpdated(ProjectPath {
+                worktree_id,
+                path: Arc::from(Path::new("a.rs"))
+            })
         );
 
         fake_server.end_progress(&progress_token).await;
         fake_server.end_progress(&progress_token).await;
         assert_eq!(
             events.next().await.unwrap(),
-            Event::DiskBasedDiagnosticsUpdated
+            Event::DiskBasedDiagnosticsUpdated { worktree_id }
         );
 
         let (buffer, _) = tree

crates/project/src/worktree.rs 🔗

@@ -64,15 +64,8 @@ pub enum Worktree {
     Remote(RemoteWorktree),
 }
 
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum Event {
-    DiskBasedDiagnosticsUpdating,
-    DiskBasedDiagnosticsUpdated,
-    DiagnosticsUpdated(Arc<Path>),
-}
-
 impl Entity for Worktree {
-    type Event = Event;
+    type Event = ();
 }
 
 impl Worktree {
@@ -1139,8 +1132,6 @@ impl LocalWorktree {
             .insert(PathKey(worktree_path.clone()), summary.clone());
         self.diagnostics.insert(worktree_path.clone(), diagnostics);
 
-        cx.emit(Event::DiagnosticsUpdated(worktree_path.clone()));
-
         if let Some(share) = self.share.as_ref() {
             cx.foreground()
                 .spawn({
@@ -1535,30 +1526,18 @@ impl RemoteWorktree {
 
     pub fn update_diagnostic_summary(
         &mut self,
-        envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
-        cx: &mut ModelContext<Worktree>,
+        path: Arc<Path>,
+        summary: &proto::DiagnosticSummary,
     ) {
-        if let Some(summary) = envelope.payload.summary {
-            let path: Arc<Path> = Path::new(&summary.path).into();
-            self.diagnostic_summaries.insert(
-                PathKey(path.clone()),
-                DiagnosticSummary {
-                    error_count: summary.error_count as usize,
-                    warning_count: summary.warning_count as usize,
-                    info_count: summary.info_count as usize,
-                    hint_count: summary.hint_count as usize,
-                },
-            );
-            cx.emit(Event::DiagnosticsUpdated(path));
-        }
-    }
-
-    pub fn disk_based_diagnostics_updating(&self, cx: &mut ModelContext<Worktree>) {
-        cx.emit(Event::DiskBasedDiagnosticsUpdating);
-    }
-
-    pub fn disk_based_diagnostics_updated(&self, cx: &mut ModelContext<Worktree>) {
-        cx.emit(Event::DiskBasedDiagnosticsUpdated);
+        self.diagnostic_summaries.insert(
+            PathKey(path.clone()),
+            DiagnosticSummary {
+                error_count: summary.error_count as usize,
+                warning_count: summary.warning_count as usize,
+                info_count: summary.info_count as usize,
+                hint_count: summary.hint_count as usize,
+            },
+        );
     }
 
     pub fn remove_collaborator(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Worktree>) {

crates/server/src/rpc.rs 🔗

@@ -1906,8 +1906,14 @@ mod tests {
         // Cause the language server to start.
         let _ = cx_a
             .background()
-            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
-                worktree.open_buffer("other.rs", cx)
+            .spawn(project_a.update(&mut cx_a, |project, cx| {
+                project.open_buffer(
+                    ProjectPath {
+                        worktree_id,
+                        path: Path::new("other.rs").into(),
+                    },
+                    cx,
+                )
             }))
             .await
             .unwrap();