From e79815622c0dcff6414a268e88b1d9e8c9246ab7 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 10 Apr 2023 12:40:09 -0700 Subject: [PATCH] Preserve ordering between UpdateProject and CreateBufferForPeer messages Previously, because UpdateProject messages were sent in a separately- spawned task, they could be sent after CreateBufferForPeer messages that were intended to be sent after them. Co-authored-by: Antonio Scandurra --- crates/collab/src/tests/integration_tests.rs | 24 +-- crates/project/src/project.rs | 212 +++++++++---------- crates/workspace/src/workspace.rs | 8 +- 3 files changed, 116 insertions(+), 128 deletions(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 82b542cb6b2d70a65a2151010fab85b056050336..dda80358742d7070b7c40864bb3fefa9dd2610b6 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1633,9 +1633,7 @@ async fn test_project_reconnect( }) .await .unwrap(); - worktree_a2 - .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) - .await; + deterministic.run_until_parked(); let worktree2_id = worktree_a2.read_with(cx_a, |tree, _| { assert!(tree.as_local().unwrap().is_shared()); tree.id() @@ -1696,11 +1694,9 @@ async fn test_project_reconnect( .unwrap(); // While client A is disconnected, add and remove worktrees from client A's project. - project_a1 - .update(cx_a, |project, cx| { - project.remove_worktree(worktree2_id, cx) - }) - .await; + project_a1.update(cx_a, |project, cx| { + project.remove_worktree(worktree2_id, cx) + }); let (worktree_a3, _) = project_a1 .update(cx_a, |p, cx| { p.find_or_create_local_worktree("/root-1/dir3", true, cx) @@ -1824,18 +1820,14 @@ async fn test_project_reconnect( }) .await .unwrap(); - worktree_a4 - .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) - .await; + deterministic.run_until_parked(); let worktree4_id = worktree_a4.read_with(cx_a, |tree, _| { assert!(tree.as_local().unwrap().is_shared()); tree.id() }); - project_a1 - .update(cx_a, |project, cx| { - project.remove_worktree(worktree3_id, cx) - }) - .await; + project_a1.update(cx_a, |project, cx| { + project.remove_worktree(worktree3_id, cx) + }); deterministic.run_until_parked(); // While client B is disconnected, mutate a buffer on both the host and the guest. diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 6a8bbb98d29f7740e53a8405c4cf74f67884735e..655425a2a873029219e8cc778314a5a8aef3368a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -13,10 +13,7 @@ use client::{proto, Client, TypedEnvelope, UserStore}; use clock::ReplicaId; use collections::{hash_map, BTreeMap, HashMap, HashSet}; use futures::{ - channel::{ - mpsc::{self, UnboundedReceiver}, - oneshot, - }, + channel::mpsc::{self, UnboundedReceiver}, future::{try_join_all, Shared}, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; @@ -142,6 +139,14 @@ enum BufferMessage { Resync, } +enum LocalProjectUpdate { + WorktreesChanged, + CreateBufferForPeer { + peer_id: proto::PeerId, + buffer_id: u64, + }, +} + enum OpenBuffer { Strong(ModelHandle), Weak(WeakModelHandle), @@ -156,8 +161,8 @@ enum WorktreeHandle { enum ProjectClientState { Local { remote_id: u64, - metadata_changed: mpsc::UnboundedSender>, - _maintain_metadata: Task<()>, + updates_tx: mpsc::UnboundedSender, + _send_updates: Task<()>, }, Remote { sharing_has_stopped: bool, @@ -725,22 +730,13 @@ impl Project { } } - fn metadata_changed(&mut self, cx: &mut ModelContext) -> impl Future { - let (tx, rx) = oneshot::channel(); - if let Some(ProjectClientState::Local { - metadata_changed, .. - }) = &mut self.client_state - { - let _ = metadata_changed.unbounded_send(tx); + fn metadata_changed(&mut self, cx: &mut ModelContext) { + if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state { + updates_tx + .unbounded_send(LocalProjectUpdate::WorktreesChanged) + .ok(); } cx.notify(); - - async move { - // If the project is shared, this will resolve when the `_maintain_metadata` task has - // a chance to update the metadata. Otherwise, it will resolve right away because `tx` - // will get dropped. - let _ = rx.await; - } } pub fn collaborators(&self) -> &HashMap { @@ -1026,40 +1022,90 @@ impl Project { .log_err(); } - let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded(); + let (updates_tx, mut updates_rx) = mpsc::unbounded(); + let client = self.client.clone(); self.client_state = Some(ProjectClientState::Local { remote_id: project_id, - metadata_changed: metadata_changed_tx, - _maintain_metadata: cx.spawn_weak(move |this, mut cx| async move { - let mut txs = Vec::new(); - while let Some(tx) = metadata_changed_rx.next().await { - txs.push(tx); - while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() { - txs.push(next_tx); - } - + updates_tx, + _send_updates: cx.spawn_weak(move |this, mut cx| async move { + while let Some(update) = updates_rx.next().await { let Some(this) = this.upgrade(&cx) else { break }; - let worktrees = - this.read_with(&cx, |this, cx| this.worktrees(cx).collect::>()); - let update_project = this - .read_with(&cx, |this, cx| { - this.client.request(proto::UpdateProject { - project_id, - worktrees: this.worktree_metadata_protos(cx), - }) - }) - .await; - if update_project.is_ok() { - for worktree in worktrees { - worktree.update(&mut cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - worktree.share(project_id, cx).detach_and_log_err(cx) - }); + + match update { + LocalProjectUpdate::WorktreesChanged => { + let worktrees = this + .read_with(&cx, |this, cx| this.worktrees(cx).collect::>()); + let update_project = this + .read_with(&cx, |this, cx| { + this.client.request(proto::UpdateProject { + project_id, + worktrees: this.worktree_metadata_protos(cx), + }) + }) + .await; + if update_project.is_ok() { + for worktree in worktrees { + worktree.update(&mut cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + worktree.share(project_id, cx).detach_and_log_err(cx) + }); + } + } } - } + LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => { + let buffer = this.update(&mut cx, |this, _| { + let buffer = this.opened_buffers.get(&buffer_id).unwrap(); + let shared_buffers = + this.shared_buffers.entry(peer_id).or_default(); + if shared_buffers.insert(buffer_id) { + if let OpenBuffer::Strong(buffer) = buffer { + Some(buffer.clone()) + } else { + None + } + } else { + None + } + }); + + let Some(buffer) = buffer else { continue }; + let operations = + buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx)); + let operations = operations.await; + let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto()); - for tx in txs.drain(..) { - let _ = tx.send(()); + let initial_state = proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some(proto::create_buffer_for_peer::Variant::State(state)), + }; + if client.send(initial_state).log_err().is_some() { + let client = client.clone(); + cx.background() + .spawn(async move { + let mut chunks = split_operations(operations).peekable(); + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); + client.send(proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some( + proto::create_buffer_for_peer::Variant::Chunk( + proto::BufferChunk { + buffer_id, + operations: chunk, + is_last, + }, + ), + ), + })?; + } + anyhow::Ok(()) + }) + .await + .log_err(); + } + } } } }), @@ -4493,15 +4539,13 @@ impl Project { &mut cx, ) .await; + project.update(&mut cx, |project, _| { project.loading_local_worktrees.remove(&path); }); - let worktree = worktree?; - - project - .update(&mut cx, |project, cx| project.add_worktree(&worktree, cx)) - .await; + let worktree = worktree?; + project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx)); Ok(worktree) } .map_err(Arc::new) @@ -4517,11 +4561,7 @@ impl Project { }) } - pub fn remove_worktree( - &mut self, - id_to_remove: WorktreeId, - cx: &mut ModelContext, - ) -> impl Future { + pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext) { self.worktrees.retain(|worktree| { if let Some(worktree) = worktree.upgrade(cx) { let id = worktree.read(cx).id(); @@ -4535,14 +4575,10 @@ impl Project { false } }); - self.metadata_changed(cx) + self.metadata_changed(cx); } - fn add_worktree( - &mut self, - worktree: &ModelHandle, - cx: &mut ModelContext, - ) -> impl Future { + fn add_worktree(&mut self, worktree: &ModelHandle, cx: &mut ModelContext) { cx.observe(worktree, |_, _, cx| cx.notify()).detach(); if worktree.read(cx).is_local() { cx.subscribe(worktree, |this, worktree, event, cx| match event { @@ -4575,7 +4611,7 @@ impl Project { .detach(); cx.emit(Event::WorktreeAdded); - self.metadata_changed(cx) + self.metadata_changed(cx); } fn update_local_worktree_buffers( @@ -5963,47 +5999,11 @@ impl Project { cx: &mut AppContext, ) -> u64 { let buffer_id = buffer.read(cx).remote_id(); - if let Some(project_id) = self.remote_id() { - let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); - if shared_buffers.insert(buffer_id) { - let buffer = buffer.clone(); - let operations = buffer.read(cx).serialize_ops(None, cx); - let client = self.client.clone(); - cx.spawn(move |cx| async move { - let operations = operations.await; - let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto()); - - client.send(proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some(proto::create_buffer_for_peer::Variant::State(state)), - })?; - - cx.background() - .spawn(async move { - let mut chunks = split_operations(operations).peekable(); - while let Some(chunk) = chunks.next() { - let is_last = chunks.peek().is_none(); - client.send(proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some(proto::create_buffer_for_peer::Variant::Chunk( - proto::BufferChunk { - buffer_id, - operations: chunk, - is_last, - }, - )), - })?; - } - anyhow::Ok(()) - }) - .await - }) - .detach() - } + if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state { + updates_tx + .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id }) + .ok(); } - buffer_id } diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 334578d67dbf37ffbcdfd4e4f67b898df8c0e608..739153fc78882c33e4d09dce958d264363634a97 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1305,10 +1305,8 @@ impl Workspace { RemoveWorktreeFromProject(worktree_id): &RemoveWorktreeFromProject, cx: &mut ViewContext, ) { - let future = self - .project + self.project .update(cx, |project, cx| project.remove_worktree(*worktree_id, cx)); - cx.foreground().spawn(future).detach(); } fn project_path_for_path( @@ -3266,9 +3264,7 @@ mod tests { ); // Remove a project folder - project - .update(cx, |project, cx| project.remove_worktree(worktree_id, cx)) - .await; + project.update(cx, |project, cx| project.remove_worktree(worktree_id, cx)); assert_eq!( cx.current_window_title(window_id).as_deref(), Some("one.txt — root2")