Preserve ordering between UpdateProject and CreateBufferForPeer messages

Max Brunsfeld and Antonio Scandurra created

Previously, because UpdateProject messages were sent in a separately-
spawned task, they could be sent after CreateBufferForPeer messages that
were intended to be sent after them.

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

crates/collab/src/tests/integration_tests.rs |  24 -
crates/project/src/project.rs                | 212 +++++++++++-----------
crates/workspace/src/workspace.rs            |   8 
3 files changed, 116 insertions(+), 128 deletions(-)

Detailed changes

crates/collab/src/tests/integration_tests.rs 🔗

@@ -1633,9 +1633,7 @@ async fn test_project_reconnect(
         })
         .await
         .unwrap();
-    worktree_a2
-        .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
-        .await;
+    deterministic.run_until_parked();
     let worktree2_id = worktree_a2.read_with(cx_a, |tree, _| {
         assert!(tree.as_local().unwrap().is_shared());
         tree.id()
@@ -1696,11 +1694,9 @@ async fn test_project_reconnect(
         .unwrap();
 
     // While client A is disconnected, add and remove worktrees from client A's project.
-    project_a1
-        .update(cx_a, |project, cx| {
-            project.remove_worktree(worktree2_id, cx)
-        })
-        .await;
+    project_a1.update(cx_a, |project, cx| {
+        project.remove_worktree(worktree2_id, cx)
+    });
     let (worktree_a3, _) = project_a1
         .update(cx_a, |p, cx| {
             p.find_or_create_local_worktree("/root-1/dir3", true, cx)
@@ -1824,18 +1820,14 @@ async fn test_project_reconnect(
         })
         .await
         .unwrap();
-    worktree_a4
-        .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
-        .await;
+    deterministic.run_until_parked();
     let worktree4_id = worktree_a4.read_with(cx_a, |tree, _| {
         assert!(tree.as_local().unwrap().is_shared());
         tree.id()
     });
-    project_a1
-        .update(cx_a, |project, cx| {
-            project.remove_worktree(worktree3_id, cx)
-        })
-        .await;
+    project_a1.update(cx_a, |project, cx| {
+        project.remove_worktree(worktree3_id, cx)
+    });
     deterministic.run_until_parked();
 
     // While client B is disconnected, mutate a buffer on both the host and the guest.

crates/project/src/project.rs 🔗

@@ -13,10 +13,7 @@ use client::{proto, Client, TypedEnvelope, UserStore};
 use clock::ReplicaId;
 use collections::{hash_map, BTreeMap, HashMap, HashSet};
 use futures::{
-    channel::{
-        mpsc::{self, UnboundedReceiver},
-        oneshot,
-    },
+    channel::mpsc::{self, UnboundedReceiver},
     future::{try_join_all, Shared},
     AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
 };
@@ -142,6 +139,14 @@ enum BufferMessage {
     Resync,
 }
 
+enum LocalProjectUpdate {
+    WorktreesChanged,
+    CreateBufferForPeer {
+        peer_id: proto::PeerId,
+        buffer_id: u64,
+    },
+}
+
 enum OpenBuffer {
     Strong(ModelHandle<Buffer>),
     Weak(WeakModelHandle<Buffer>),
@@ -156,8 +161,8 @@ enum WorktreeHandle {
 enum ProjectClientState {
     Local {
         remote_id: u64,
-        metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
-        _maintain_metadata: Task<()>,
+        updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
+        _send_updates: Task<()>,
     },
     Remote {
         sharing_has_stopped: bool,
@@ -725,22 +730,13 @@ impl Project {
         }
     }
 
-    fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
-        let (tx, rx) = oneshot::channel();
-        if let Some(ProjectClientState::Local {
-            metadata_changed, ..
-        }) = &mut self.client_state
-        {
-            let _ = metadata_changed.unbounded_send(tx);
+    fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
+        if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
+            updates_tx
+                .unbounded_send(LocalProjectUpdate::WorktreesChanged)
+                .ok();
         }
         cx.notify();
-
-        async move {
-            // If the project is shared, this will resolve when the `_maintain_metadata` task has
-            // a chance to update the metadata. Otherwise, it will resolve right away because `tx`
-            // will get dropped.
-            let _ = rx.await;
-        }
     }
 
     pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
@@ -1026,40 +1022,90 @@ impl Project {
                 .log_err();
         }
 
-        let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
+        let (updates_tx, mut updates_rx) = mpsc::unbounded();
+        let client = self.client.clone();
         self.client_state = Some(ProjectClientState::Local {
             remote_id: project_id,
-            metadata_changed: metadata_changed_tx,
-            _maintain_metadata: cx.spawn_weak(move |this, mut cx| async move {
-                let mut txs = Vec::new();
-                while let Some(tx) = metadata_changed_rx.next().await {
-                    txs.push(tx);
-                    while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
-                        txs.push(next_tx);
-                    }
-
+            updates_tx,
+            _send_updates: cx.spawn_weak(move |this, mut cx| async move {
+                while let Some(update) = updates_rx.next().await {
                     let Some(this) = this.upgrade(&cx) else { break };
-                    let worktrees =
-                        this.read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
-                    let update_project = this
-                        .read_with(&cx, |this, cx| {
-                            this.client.request(proto::UpdateProject {
-                                project_id,
-                                worktrees: this.worktree_metadata_protos(cx),
-                            })
-                        })
-                        .await;
-                    if update_project.is_ok() {
-                        for worktree in worktrees {
-                            worktree.update(&mut cx, |worktree, cx| {
-                                let worktree = worktree.as_local_mut().unwrap();
-                                worktree.share(project_id, cx).detach_and_log_err(cx)
-                            });
+
+                    match update {
+                        LocalProjectUpdate::WorktreesChanged => {
+                            let worktrees = this
+                                .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
+                            let update_project = this
+                                .read_with(&cx, |this, cx| {
+                                    this.client.request(proto::UpdateProject {
+                                        project_id,
+                                        worktrees: this.worktree_metadata_protos(cx),
+                                    })
+                                })
+                                .await;
+                            if update_project.is_ok() {
+                                for worktree in worktrees {
+                                    worktree.update(&mut cx, |worktree, cx| {
+                                        let worktree = worktree.as_local_mut().unwrap();
+                                        worktree.share(project_id, cx).detach_and_log_err(cx)
+                                    });
+                                }
+                            }
                         }
-                    }
+                        LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
+                            let buffer = this.update(&mut cx, |this, _| {
+                                let buffer = this.opened_buffers.get(&buffer_id).unwrap();
+                                let shared_buffers =
+                                    this.shared_buffers.entry(peer_id).or_default();
+                                if shared_buffers.insert(buffer_id) {
+                                    if let OpenBuffer::Strong(buffer) = buffer {
+                                        Some(buffer.clone())
+                                    } else {
+                                        None
+                                    }
+                                } else {
+                                    None
+                                }
+                            });
+
+                            let Some(buffer) = buffer else { continue };
+                            let operations =
+                                buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
+                            let operations = operations.await;
+                            let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
 
-                    for tx in txs.drain(..) {
-                        let _ = tx.send(());
+                            let initial_state = proto::CreateBufferForPeer {
+                                project_id,
+                                peer_id: Some(peer_id),
+                                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+                            };
+                            if client.send(initial_state).log_err().is_some() {
+                                let client = client.clone();
+                                cx.background()
+                                    .spawn(async move {
+                                        let mut chunks = split_operations(operations).peekable();
+                                        while let Some(chunk) = chunks.next() {
+                                            let is_last = chunks.peek().is_none();
+                                            client.send(proto::CreateBufferForPeer {
+                                                project_id,
+                                                peer_id: Some(peer_id),
+                                                variant: Some(
+                                                    proto::create_buffer_for_peer::Variant::Chunk(
+                                                        proto::BufferChunk {
+                                                            buffer_id,
+                                                            operations: chunk,
+                                                            is_last,
+                                                        },
+                                                    ),
+                                                ),
+                                            })?;
+                                        }
+                                        anyhow::Ok(())
+                                    })
+                                    .await
+                                    .log_err();
+                            }
+                        }
                     }
                 }
             }),
@@ -4493,15 +4539,13 @@ impl Project {
                             &mut cx,
                         )
                         .await;
+
                         project.update(&mut cx, |project, _| {
                             project.loading_local_worktrees.remove(&path);
                         });
-                        let worktree = worktree?;
-
-                        project
-                            .update(&mut cx, |project, cx| project.add_worktree(&worktree, cx))
-                            .await;
 
+                        let worktree = worktree?;
+                        project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx));
                         Ok(worktree)
                     }
                     .map_err(Arc::new)
@@ -4517,11 +4561,7 @@ impl Project {
         })
     }
 
-    pub fn remove_worktree(
-        &mut self,
-        id_to_remove: WorktreeId,
-        cx: &mut ModelContext<Self>,
-    ) -> impl Future<Output = ()> {
+    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
         self.worktrees.retain(|worktree| {
             if let Some(worktree) = worktree.upgrade(cx) {
                 let id = worktree.read(cx).id();
@@ -4535,14 +4575,10 @@ impl Project {
                 false
             }
         });
-        self.metadata_changed(cx)
+        self.metadata_changed(cx);
     }
 
-    fn add_worktree(
-        &mut self,
-        worktree: &ModelHandle<Worktree>,
-        cx: &mut ModelContext<Self>,
-    ) -> impl Future<Output = ()> {
+    fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
         cx.observe(worktree, |_, _, cx| cx.notify()).detach();
         if worktree.read(cx).is_local() {
             cx.subscribe(worktree, |this, worktree, event, cx| match event {
@@ -4575,7 +4611,7 @@ impl Project {
         .detach();
 
         cx.emit(Event::WorktreeAdded);
-        self.metadata_changed(cx)
+        self.metadata_changed(cx);
     }
 
     fn update_local_worktree_buffers(
@@ -5963,47 +5999,11 @@ impl Project {
         cx: &mut AppContext,
     ) -> u64 {
         let buffer_id = buffer.read(cx).remote_id();
-        if let Some(project_id) = self.remote_id() {
-            let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
-            if shared_buffers.insert(buffer_id) {
-                let buffer = buffer.clone();
-                let operations = buffer.read(cx).serialize_ops(None, cx);
-                let client = self.client.clone();
-                cx.spawn(move |cx| async move {
-                    let operations = operations.await;
-                    let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
-
-                    client.send(proto::CreateBufferForPeer {
-                        project_id,
-                        peer_id: Some(peer_id),
-                        variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
-                    })?;
-
-                    cx.background()
-                        .spawn(async move {
-                            let mut chunks = split_operations(operations).peekable();
-                            while let Some(chunk) = chunks.next() {
-                                let is_last = chunks.peek().is_none();
-                                client.send(proto::CreateBufferForPeer {
-                                    project_id,
-                                    peer_id: Some(peer_id),
-                                    variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
-                                        proto::BufferChunk {
-                                            buffer_id,
-                                            operations: chunk,
-                                            is_last,
-                                        },
-                                    )),
-                                })?;
-                            }
-                            anyhow::Ok(())
-                        })
-                        .await
-                })
-                .detach()
-            }
+        if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state {
+            updates_tx
+                .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
+                .ok();
         }
-
         buffer_id
     }
 

crates/workspace/src/workspace.rs 🔗

@@ -1305,10 +1305,8 @@ impl Workspace {
         RemoveWorktreeFromProject(worktree_id): &RemoveWorktreeFromProject,
         cx: &mut ViewContext<Self>,
     ) {
-        let future = self
-            .project
+        self.project
             .update(cx, |project, cx| project.remove_worktree(*worktree_id, cx));
-        cx.foreground().spawn(future).detach();
     }
 
     fn project_path_for_path(
@@ -3266,9 +3264,7 @@ mod tests {
         );
 
         // Remove a project folder
-        project
-            .update(cx, |project, cx| project.remove_worktree(worktree_id, cx))
-            .await;
+        project.update(cx, |project, cx| project.remove_worktree(worktree_id, cx));
         assert_eq!(
             cx.current_window_title(window_id).as_deref(),
             Some("one.txt — root2")