Replicate diagnostic summaries

Max Brunsfeld and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/gpui/src/app.rs         |  4 +
crates/gpui/src/executor.rs    | 20 +++++++-
crates/project/src/project.rs  | 44 ++++++++++++++++++++
crates/project/src/worktree.rs | 77 ++++++++++++++++++++++++++++++++---
crates/rpc/proto/zed.proto     | 60 +++++++++++++++------------
crates/rpc/src/proto.rs        | 10 +++-
crates/server/src/rpc.rs       | 74 ++++++++++++++++++++++++++++------
7 files changed, 235 insertions(+), 54 deletions(-)

Detailed changes

crates/gpui/src/app.rs 🔗

@@ -2672,6 +2672,10 @@ impl<T: Entity> ModelHandle<T> {
                         }
                     }
 
+                    if cx.borrow_mut().foreground().would_park() {
+                        panic!("parked while waiting on condition");
+                    }
+
                     rx.recv()
                         .await
                         .expect("model dropped with pending condition");

crates/gpui/src/executor.rs 🔗

@@ -206,10 +206,7 @@ impl Deterministic {
                 }
 
                 let state = self.state.lock();
-                if state.scheduled_from_foreground.is_empty()
-                    && state.scheduled_from_background.is_empty()
-                    && state.spawned_from_foreground.is_empty()
-                {
+                if state.would_park() {
                     return None;
                 }
             }
@@ -261,6 +258,14 @@ impl Deterministic {
     }
 }
 
+impl DeterministicState {
+    fn would_park(&self) -> bool {
+        self.scheduled_from_foreground.is_empty()
+            && self.scheduled_from_background.is_empty()
+            && self.spawned_from_foreground.is_empty()
+    }
+}
+
 #[derive(Default)]
 struct Trace {
     executed: Vec<Backtrace>,
@@ -433,6 +438,13 @@ impl Foreground {
         *any_value.downcast().unwrap()
     }
 
+    pub fn would_park(&self) -> bool {
+        match self {
+            Self::Deterministic(executor) => executor.state.lock().would_park(),
+            _ => panic!("this method can only be called on a deterministic executor"),
+        }
+    }
+
     pub fn forbid_parking(&self) {
         match self {
             Self::Deterministic(executor) => {

crates/project/src/project.rs 🔗

@@ -70,7 +70,7 @@ pub struct ProjectPath {
     pub path: Arc<Path>,
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug, Default, PartialEq)]
 pub struct DiagnosticSummary {
     pub error_count: usize,
     pub warning_count: usize,
@@ -243,6 +243,12 @@ impl Project {
                 client.subscribe_to_entity(remote_id, cx, Self::handle_share_worktree),
                 client.subscribe_to_entity(remote_id, cx, Self::handle_unregister_worktree),
                 client.subscribe_to_entity(remote_id, cx, Self::handle_update_worktree),
+                client.subscribe_to_entity(remote_id, cx, Self::handle_update_diagnostic_summary),
+                client.subscribe_to_entity(
+                    remote_id,
+                    cx,
+                    Self::handle_disk_based_diagnostics_updated,
+                ),
                 client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
                 client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
             ],
@@ -661,6 +667,42 @@ impl Project {
         Ok(())
     }
 
+    fn handle_update_diagnostic_summary(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
+        _: Arc<Client>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
+        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
+            worktree.update(cx, |worktree, cx| {
+                worktree
+                    .as_remote_mut()
+                    .unwrap()
+                    .update_diagnostic_summary(envelope, cx);
+            });
+        }
+        Ok(())
+    }
+
+    fn handle_disk_based_diagnostics_updated(
+        &mut self,
+        envelope: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
+        _: Arc<Client>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
+        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
+            worktree.update(cx, |worktree, cx| {
+                worktree
+                    .as_remote()
+                    .unwrap()
+                    .disk_based_diagnostics_updated(cx);
+            });
+        }
+        Ok(())
+    }
+
     pub fn handle_update_buffer(
         &mut self,
         envelope: TypedEnvelope<proto::UpdateBuffer>,

crates/project/src/worktree.rs 🔗

@@ -777,10 +777,38 @@ impl Worktree {
         }
 
         let this = self.as_local_mut().unwrap();
+        let summary = DiagnosticSummary::new(&diagnostics);
         this.diagnostic_summaries
-            .insert(worktree_path.clone(), DiagnosticSummary::new(&diagnostics));
+            .insert(worktree_path.clone(), summary.clone());
         this.diagnostics.insert(worktree_path.clone(), diagnostics);
+
         cx.emit(Event::DiagnosticsUpdated(worktree_path.clone()));
+
+        if let Some(share) = this.share.as_ref() {
+            cx.foreground()
+                .spawn({
+                    let client = this.client.clone();
+                    let project_id = share.project_id;
+                    let worktree_id = this.id().to_proto();
+                    let path = worktree_path.to_string_lossy().to_string();
+                    async move {
+                        client
+                            .send(proto::UpdateDiagnosticSummary {
+                                project_id,
+                                worktree_id,
+                                path,
+                                error_count: summary.error_count as u32,
+                                warning_count: summary.warning_count as u32,
+                                info_count: summary.info_count as u32,
+                                hint_count: summary.hint_count as u32,
+                            })
+                            .await
+                            .log_err()
+                    }
+                })
+                .detach();
+        }
+
         Ok(())
     }
 
@@ -1063,6 +1091,8 @@ impl LocalWorktree {
             let disk_based_diagnostics_progress_token =
                 language.disk_based_diagnostics_progress_token().cloned();
             let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
+            let (disk_based_diagnostics_done_tx, disk_based_diagnostics_done_rx) =
+                smol::channel::unbounded();
             language_server
                 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
                     smol::block_on(diagnostics_tx.send(params)).ok();
@@ -1071,6 +1101,7 @@ impl LocalWorktree {
             cx.spawn_weak(|this, mut cx| {
                 let has_disk_based_diagnostic_progress_token =
                     disk_based_diagnostics_progress_token.is_some();
+                let disk_based_diagnostics_done_tx = disk_based_diagnostics_done_tx.clone();
                 async move {
                     while let Ok(diagnostics) = diagnostics_rx.recv().await {
                         if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
@@ -1078,9 +1109,9 @@ impl LocalWorktree {
                                 this.update_diagnostics(diagnostics, &disk_based_sources, cx)
                                     .log_err();
                                 if !has_disk_based_diagnostic_progress_token {
-                                    cx.emit(Event::DiskBasedDiagnosticsUpdated);
+                                    smol::block_on(disk_based_diagnostics_done_tx.send(())).ok();
                                 }
-                            });
+                            })
                         } else {
                             break;
                         }
@@ -1089,8 +1120,6 @@ impl LocalWorktree {
             })
             .detach();
 
-            let (mut disk_based_diagnostics_done_tx, mut disk_based_diagnostics_done_rx) =
-                watch::channel_with(());
             language_server
                 .on_notification::<lsp::notification::Progress, _>(move |params| {
                     let token = match params.token {
@@ -1110,12 +1139,24 @@ impl LocalWorktree {
                     }
                 })
                 .detach();
+            let rpc = self.client.clone();
             cx.spawn_weak(|this, mut cx| async move {
-                while let Some(()) = disk_based_diagnostics_done_rx.recv().await {
+                while let Ok(()) = disk_based_diagnostics_done_rx.recv().await {
                     if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
-                        handle.update(&mut cx, |_, cx| {
+                        let message = handle.update(&mut cx, |this, cx| {
                             cx.emit(Event::DiskBasedDiagnosticsUpdated);
+                            let this = this.as_local().unwrap();
+                            this.share
+                                .as_ref()
+                                .map(|share| proto::DiskBasedDiagnosticsUpdated {
+                                    project_id: share.project_id,
+                                    worktree_id: this.id().to_proto(),
+                                })
                         });
+
+                        if let Some(message) = message {
+                            rpc.send(message).await.log_err();
+                        }
                     } else {
                         break;
                     }
@@ -1572,6 +1613,28 @@ impl RemoteWorktree {
         Ok(())
     }
 
+    pub fn update_diagnostic_summary(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
+        cx: &mut ModelContext<Worktree>,
+    ) {
+        let path: Arc<Path> = Path::new(&envelope.payload.path).into();
+        self.diagnostic_summaries.insert(
+            path.clone(),
+            DiagnosticSummary {
+                error_count: envelope.payload.error_count as usize,
+                warning_count: envelope.payload.warning_count as usize,
+                info_count: envelope.payload.info_count as usize,
+                hint_count: envelope.payload.hint_count as usize,
+            },
+        );
+        cx.emit(Event::DiagnosticsUpdated(path));
+    }
+
+    pub fn disk_based_diagnostics_updated(&self, cx: &mut ModelContext<Worktree>) {
+        cx.emit(Event::DiskBasedDiagnosticsUpdated);
+    }
+
     pub fn remove_collaborator(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Worktree>) {
         for (_, buffer) in &self.open_buffers {
             if let Some(buffer) = buffer.upgrade(cx) {

crates/rpc/proto/zed.proto 🔗

@@ -23,32 +23,33 @@ message Envelope {
 
         RegisterWorktree register_worktree = 17;
         UnregisterWorktree unregister_worktree = 18;
-        ShareWorktree share_worktree = 100;
-        UpdateWorktree update_worktree = 19;
-        UpdateDiagnosticSummary update_diagnostic_summary = 20;
-
-        OpenBuffer open_buffer = 22;
-        OpenBufferResponse open_buffer_response = 23;
-        CloseBuffer close_buffer = 24;
-        UpdateBuffer update_buffer = 25;
-        SaveBuffer save_buffer = 26;
-        BufferSaved buffer_saved = 27;
-
-        GetChannels get_channels = 28;
-        GetChannelsResponse get_channels_response = 29;
-        JoinChannel join_channel = 30;
-        JoinChannelResponse join_channel_response = 31;
-        LeaveChannel leave_channel = 32;
-        SendChannelMessage send_channel_message = 33;
-        SendChannelMessageResponse send_channel_message_response = 34;
-        ChannelMessageSent channel_message_sent = 35;
-        GetChannelMessages get_channel_messages = 36;
-        GetChannelMessagesResponse get_channel_messages_response = 37;
-
-        UpdateContacts update_contacts = 38;
-
-        GetUsers get_users = 39;
-        GetUsersResponse get_users_response = 40;
+        ShareWorktree share_worktree = 19;
+        UpdateWorktree update_worktree = 20;
+        UpdateDiagnosticSummary update_diagnostic_summary = 21;
+        DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 22;
+
+        OpenBuffer open_buffer = 23;
+        OpenBufferResponse open_buffer_response = 24;
+        CloseBuffer close_buffer = 25;
+        UpdateBuffer update_buffer = 26;
+        SaveBuffer save_buffer = 27;
+        BufferSaved buffer_saved = 28;
+
+        GetChannels get_channels = 29;
+        GetChannelsResponse get_channels_response = 30;
+        JoinChannel join_channel = 31;
+        JoinChannelResponse join_channel_response = 32;
+        LeaveChannel leave_channel = 33;
+        SendChannelMessage send_channel_message = 34;
+        SendChannelMessageResponse send_channel_message_response = 35;
+        ChannelMessageSent channel_message_sent = 36;
+        GetChannelMessages get_channel_messages = 37;
+        GetChannelMessagesResponse get_channel_messages_response = 38;
+
+        UpdateContacts update_contacts = 39;
+
+        GetUsers get_users = 40;
+        GetUsersResponse get_users_response = 41;
     }
 }
 
@@ -172,6 +173,13 @@ message UpdateDiagnosticSummary {
     string path = 3;
     uint32 error_count = 4;
     uint32 warning_count = 5;
+    uint32 info_count = 6;
+    uint32 hint_count = 7;
+}
+
+message DiskBasedDiagnosticsUpdated {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
 }
 
 message GetChannels {}

crates/rpc/src/proto.rs 🔗

@@ -125,6 +125,7 @@ messages!(
     BufferSaved,
     ChannelMessageSent,
     CloseBuffer,
+    DiskBasedDiagnosticsUpdated,
     Error,
     GetChannelMessages,
     GetChannelMessagesResponse,
@@ -155,6 +156,7 @@ messages!(
     UnshareProject,
     UpdateBuffer,
     UpdateContacts,
+    UpdateDiagnosticSummary,
     UpdateWorktree,
 );
 
@@ -178,17 +180,19 @@ request_messages!(
 entity_messages!(
     project_id,
     AddProjectCollaborator,
-    RemoveProjectCollaborator,
+    BufferSaved,
+    CloseBuffer,
+    DiskBasedDiagnosticsUpdated,
     JoinProject,
     LeaveProject,
-    BufferSaved,
     OpenBuffer,
-    CloseBuffer,
+    RemoveProjectCollaborator,
     SaveBuffer,
     ShareWorktree,
     UnregisterWorktree,
     UnshareProject,
     UpdateBuffer,
+    UpdateDiagnosticSummary,
     UpdateWorktree,
 );
 

crates/server/src/rpc.rs 🔗

@@ -71,6 +71,8 @@ impl Server {
             .add_handler(Server::unregister_worktree)
             .add_handler(Server::share_worktree)
             .add_handler(Server::update_worktree)
+            .add_handler(Server::update_diagnostic_summary)
+            .add_handler(Server::disk_based_diagnostics_updated)
             .add_handler(Server::open_buffer)
             .add_handler(Server::close_buffer)
             .add_handler(Server::update_buffer)
@@ -517,6 +519,38 @@ impl Server {
         Ok(())
     }
 
+    async fn update_diagnostic_summary(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
+    ) -> tide::Result<()> {
+        let receiver_ids = self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)
+            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
+        broadcast(request.sender_id, receiver_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })
+        .await?;
+        Ok(())
+    }
+
+    async fn disk_based_diagnostics_updated(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
+    ) -> tide::Result<()> {
+        let receiver_ids = self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)
+            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
+        broadcast(request.sender_id, receiver_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })
+        .await?;
+        Ok(())
+    }
+
     async fn open_buffer(
         self: Arc<Server>,
         request: TypedEnvelope<proto::OpenBuffer>,
@@ -1026,7 +1060,7 @@ mod tests {
             LanguageRegistry, LanguageServerConfig, Point,
         },
         lsp,
-        project::Project,
+        project::{DiagnosticSummary, Project},
     };
 
     #[gpui::test]
@@ -1781,6 +1815,19 @@ mod tests {
             .await
             .unwrap();
 
+        // Join the worktree as client B.
+        let project_b = Project::remote(
+            project_id,
+            client_b.clone(),
+            client_b.user_store.clone(),
+            lang_registry.clone(),
+            fs.clone(),
+            &mut cx_b.to_async(),
+        )
+        .await
+        .unwrap();
+        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
+
         // Simulate a language server reporting errors for a file.
         fake_language_server
             .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
@@ -1806,18 +1853,19 @@ mod tests {
             })
             .await;
 
-        // Join the worktree as client B.
-        let project_b = Project::remote(
-            project_id,
-            client_b.clone(),
-            client_b.user_store.clone(),
-            lang_registry.clone(),
-            fs.clone(),
-            &mut cx_b.to_async(),
-        )
-        .await
-        .unwrap();
-        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
+        worktree_b
+            .condition(&cx_b, |worktree, _| {
+                worktree.diagnostic_summaries().collect::<Vec<_>>()
+                    == &[(
+                        Arc::from(Path::new("a.rs")),
+                        DiagnosticSummary {
+                            error_count: 1,
+                            warning_count: 1,
+                            ..Default::default()
+                        },
+                    )]
+            })
+            .await;
 
         // Open the file with the errors.
         let buffer_b = cx_b