Simplify project syncing (#16976)

Conrad Irwin and Max created

As part of allowing LSPs to run remotely, we need to move LSP stuff
out of project. To do that we'd like to simplify the concurrency story
on project syncing.

Co-Authored-By: Max <max@zed.dev>

Release Notes:

- N/A

Co-authored-by: Max <max@zed.dev>

Change summary

crates/project/src/project.rs | 174 ++++++++++++++----------------------
1 file changed, 67 insertions(+), 107 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -286,22 +286,11 @@ enum BufferOrderedMessage {
     Resync,
 }
 
-#[derive(Debug)]
-enum LocalProjectUpdate {
-    WorktreesChanged,
-    CreateBufferForPeer {
-        peer_id: proto::PeerId,
-        buffer_id: BufferId,
-    },
-}
-
 #[derive(Debug)]
 enum ProjectClientState {
     Local,
     Shared {
         remote_id: u64,
-        updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
-        _send_updates: Task<Result<()>>,
     },
     Remote {
         sharing_has_stopped: bool,
@@ -1448,12 +1437,48 @@ impl Project {
     }
 
     fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
-        if let ProjectClientState::Shared { updates_tx, .. } = &mut self.client_state {
-            updates_tx
-                .unbounded_send(LocalProjectUpdate::WorktreesChanged)
-                .ok();
-        }
         cx.notify();
+        let ProjectClientState::Shared { remote_id } = self.client_state else {
+            return;
+        };
+        let worktrees = self.worktrees(cx).collect::<Vec<_>>();
+        let project_id = remote_id;
+
+        let update_project = self.client.request(proto::UpdateProject {
+            project_id,
+            worktrees: self.worktree_metadata_protos(cx),
+        });
+        cx.spawn(|this, mut cx| async move {
+            update_project.await?;
+
+            this.update(&mut cx, |this, cx| {
+                let client = this.client.clone();
+                for worktree in worktrees {
+                    worktree.update(cx, |worktree, cx| {
+                        if let Some(summaries) = this.diagnostic_summaries.get(&worktree.id()) {
+                            for (path, summaries) in summaries {
+                                for (&server_id, summary) in summaries {
+                                    this.client.send(proto::UpdateDiagnosticSummary {
+                                        project_id,
+                                        worktree_id: worktree.id().to_proto(),
+                                        summary: Some(summary.to_proto(server_id, path)),
+                                    })?;
+                                }
+                            }
+                        }
+
+                        worktree.observe_updates(project_id, cx, {
+                            let client = client.clone();
+                            move |update| client.request(update).map(|result| result.is_ok())
+                        });
+
+                        anyhow::Ok(())
+                    })?;
+                }
+                anyhow::Ok(())
+            })
+        })
+        .detach_and_log_err(cx);
     }
 
     pub fn task_inventory(&self) -> &Model<Inventory> {
@@ -1688,95 +1713,8 @@ impl Project {
             }
         }
 
-        let (updates_tx, mut updates_rx) = mpsc::unbounded();
-        let client = self.client.clone();
         self.client_state = ProjectClientState::Shared {
             remote_id: project_id,
-            updates_tx,
-            _send_updates: cx.spawn(move |this, mut cx| async move {
-                while let Some(update) = updates_rx.next().await {
-                    match update {
-                        LocalProjectUpdate::WorktreesChanged => {
-                            let worktrees = this.update(&mut cx, |this, cx| {
-                                this.worktrees(cx).collect::<Vec<_>>()
-                            })?;
-
-                            let update_project = this
-                                .update(&mut cx, |this, cx| {
-                                    this.client.request(proto::UpdateProject {
-                                        project_id,
-                                        worktrees: this.worktree_metadata_protos(cx),
-                                    })
-                                })?
-                                .await;
-                            if update_project.log_err().is_none() {
-                                continue;
-                            }
-
-                            this.update(&mut cx, |this, cx| {
-                                for worktree in worktrees {
-                                    worktree.update(cx, |worktree, cx| {
-                                        if let Some(summaries) =
-                                            this.diagnostic_summaries.get(&worktree.id())
-                                        {
-                                            for (path, summaries) in summaries {
-                                                for (&server_id, summary) in summaries {
-                                                    this.client.send(
-                                                        proto::UpdateDiagnosticSummary {
-                                                            project_id,
-                                                            worktree_id: worktree.id().to_proto(),
-                                                            summary: Some(
-                                                                summary.to_proto(server_id, path),
-                                                            ),
-                                                        },
-                                                    )?;
-                                                }
-                                            }
-                                        }
-
-                                        worktree.observe_updates(project_id, cx, {
-                                            let client = client.clone();
-                                            move |update| {
-                                                client.request(update).map(|result| result.is_ok())
-                                            }
-                                        });
-
-                                        anyhow::Ok(())
-                                    })?;
-                                }
-                                anyhow::Ok(())
-                            })??;
-                        }
-                        LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
-                            let Some(buffer_store) = this.update(&mut cx, |this, _| {
-                                if this
-                                    .shared_buffers
-                                    .entry(peer_id)
-                                    .or_default()
-                                    .insert(buffer_id)
-                                {
-                                    Some(this.buffer_store.clone())
-                                } else {
-                                    None
-                                }
-                            })?
-                            else {
-                                continue;
-                            };
-                            BufferStore::create_buffer_for_peer(
-                                buffer_store,
-                                peer_id,
-                                buffer_id,
-                                project_id,
-                                client.clone().into(),
-                                &mut cx,
-                            )
-                            .await?;
-                        }
-                    }
-                }
-                Ok(())
-            }),
         };
 
         self.metadata_changed(cx);
@@ -9857,11 +9795,33 @@ impl Project {
         cx: &mut AppContext,
     ) -> BufferId {
         let buffer_id = buffer.read(cx).remote_id();
-        if let ProjectClientState::Shared { updates_tx, .. } = &self.client_state {
-            updates_tx
-                .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
-                .ok();
+        if !self
+            .shared_buffers
+            .entry(peer_id)
+            .or_default()
+            .insert(buffer_id)
+        {
+            return buffer_id;
         }
+        let ProjectClientState::Shared { remote_id } = self.client_state else {
+            return buffer_id;
+        };
+        let buffer_store = self.buffer_store.clone();
+        let client = self.client().clone();
+
+        cx.spawn(|mut cx| async move {
+            BufferStore::create_buffer_for_peer(
+                buffer_store,
+                peer_id,
+                buffer_id,
+                remote_id,
+                client.clone().into(),
+                &mut cx,
+            )
+            .await?;
+            anyhow::Ok(())
+        })
+        .detach_and_log_err(cx);
         buffer_id
     }