From 6212f2fe302d6478f50450094c590842697df096 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 5 May 2022 13:47:53 +0200 Subject: [PATCH] Wait for remote worktree to catch up with host before mutating entries This ensures that entries don't randomly re-appear on remote worktrees due to observing an update too late. In fact, it ensures that the remote worktree has the same starting state of the host before preemptively applying the fs operation locally. --- crates/collab/src/rpc.rs | 3 + crates/collab/src/rpc/store.rs | 3 + crates/project/src/project.rs | 91 ++++++++++++------ crates/project/src/worktree.rs | 163 ++++++++++++++++++++------------- crates/rpc/proto/zed.proto | 3 + crates/rpc/src/proto.rs | 2 +- 6 files changed, 171 insertions(+), 94 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 2367532d28a314bafab9bb284447493025f0401c..d17473e1398013b459cd94c8874ea375ec2c6d2b 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -448,6 +448,7 @@ impl Server { .cloned() .collect(), visible: worktree.visible, + scan_id: shared_worktree.scan_id, }) }) .collect(); @@ -578,6 +579,7 @@ impl Server { request.payload.worktree_id, &request.payload.removed_entries, &request.payload.updated_entries, + request.payload.scan_id, )?; broadcast(request.sender_id, connection_ids, |connection_id| { @@ -5804,6 +5806,7 @@ mod tests { guest_client.username, id ); + assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); } guest_client diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 3be072c5e2e67b84b7cf5f70c7367f2e4b3be4d6..4737dd2c804ded463841948413d404d09d0858c0 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -46,6 +46,7 @@ pub struct ProjectShare { pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, + pub scan_id: u64, } #[derive(Default)] @@ -561,6 +562,7 @@ impl Store { worktree_id: u64, removed_entries: &[u64], updated_entries: &[proto::Entry], + scan_id: u64, ) -> Result> { let project = self.write_project(project_id, connection_id)?; let worktree = project @@ -574,6 +576,7 @@ impl Store { for entry in updated_entries { worktree.entries.insert(entry.id, entry.clone()); } + worktree.scan_id = scan_id; let connection_ids = project.connection_ids(); Ok(connection_ids) } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7af1199ec107b9c10a1f12af592e929dea964ac8..1fcd89fcde90bee62b0a3831b4e2df3d491ed956 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -723,7 +723,11 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry(entry, cx) + worktree.as_remote().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -762,7 +766,11 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry(entry, cx) + worktree.as_remote().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -783,7 +791,7 @@ impl Project { let client = self.client.clone(); let project_id = self.remote_id().unwrap(); Some(cx.spawn_weak(|_, mut cx| async move { - client + let response = client .request(proto::DeleteProjectEntry { project_id, entry_id: entry_id.to_proto(), @@ -791,7 +799,11 @@ impl Project { .await?; worktree .update(&mut cx, move |worktree, cx| { - worktree.as_remote().unwrap().delete_entry(entry_id, cx) + worktree.as_remote().unwrap().delete_entry( + entry_id, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -3805,6 +3817,7 @@ impl Project { entries: Default::default(), diagnostic_summaries: Default::default(), visible: envelope.payload.visible, + scan_id: 0, }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); @@ -3851,21 +3864,22 @@ impl Project { _: Arc, mut cx: AsyncAppContext, ) -> Result { - let entry = this - .update(&mut cx, |this, cx| { - let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - let worktree = this - .worktree_for_id(worktree_id, cx) - .ok_or_else(|| anyhow!("worktree not found"))?; - worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - let path = PathBuf::from(OsString::from_vec(envelope.payload.path)); - anyhow::Ok(worktree.create_entry(path, envelope.payload.is_directory, cx)) - }) - })? + let worktree = this.update(&mut cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); + this.worktree_for_id(worktree_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + let entry = worktree + .update(&mut cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + let path = PathBuf::from(OsString::from_vec(envelope.payload.path)); + worktree.create_entry(path, envelope.payload.is_directory, cx) + }) .await?; Ok(proto::ProjectEntryResponse { entry: Some((&entry).into()), + worktree_scan_id: worktree_scan_id as u64, }) } @@ -3875,16 +3889,25 @@ impl Project { _: Arc, mut cx: AsyncAppContext, ) -> Result { - let entry = this - .update(&mut cx, |this, cx| { - let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let worktree = this.read_with(&cx, |this, cx| { + this.worktree_for_entry(entry_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + let entry = worktree + .update(&mut cx, |worktree, cx| { let new_path = PathBuf::from(OsString::from_vec(envelope.payload.new_path)); - this.rename_entry(entry_id, new_path, cx) + worktree + .as_local_mut() + .unwrap() + .rename_entry(entry_id, new_path, cx) .ok_or_else(|| anyhow!("invalid entry")) })? .await?; Ok(proto::ProjectEntryResponse { entry: Some((&entry).into()), + worktree_scan_id: worktree_scan_id as u64, }) } @@ -3893,14 +3916,26 @@ impl Project { envelope: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, - ) -> Result { - this.update(&mut cx, |this, cx| { - let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); - this.delete_entry(entry_id, cx) - .ok_or_else(|| anyhow!("invalid entry")) - })? - .await?; - Ok(proto::Ack {}) + ) -> Result { + let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let worktree = this.read_with(&cx, |this, cx| { + this.worktree_for_entry(entry_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + worktree + .update(&mut cx, |worktree, cx| { + worktree + .as_local_mut() + .unwrap() + .delete_entry(entry_id, cx) + .ok_or_else(|| anyhow!("invalid entry")) + })? + .await?; + Ok(proto::ProjectEntryResponse { + entry: None, + worktree_scan_id: worktree_scan_id as u64, + }) } async fn handle_update_diagnostic_summary( diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 2a1808457cc61b35691a44c98973c99db00361f3..bab41bbe278e21330b1f695a09feb9375efc105f 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -29,7 +29,6 @@ use language::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ - barrier, prelude::{Sink as _, Stream as _}, watch, }; @@ -84,17 +83,13 @@ pub struct RemoteWorktree { pub(crate) background_snapshot: Arc>, project_id: u64, client: Arc, - updates_tx: UnboundedSender, + updates_tx: UnboundedSender, + last_scan_id_rx: watch::Receiver, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, } -enum BackgroundUpdate { - Update(proto::UpdateWorktree), - Barrier(barrier::Sender), -} - #[derive(Clone)] pub struct Snapshot { id: WorktreeId, @@ -102,12 +97,12 @@ pub struct Snapshot { root_char_bag: CharBag, entries_by_path: SumTree, entries_by_id: SumTree, + scan_id: usize, } #[derive(Clone)] pub struct LocalSnapshot { abs_path: Arc, - scan_id: usize, ignores: HashMap, (Arc, usize)>, removed_entry_ids: HashMap, next_entry_id: Arc, @@ -221,11 +216,13 @@ impl Worktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), + scan_id: worktree.scan_id as usize, }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); + let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { project_id: project_remote_id, @@ -233,6 +230,7 @@ impl Worktree { snapshot: snapshot.clone(), background_snapshot: background_snapshot.clone(), updates_tx, + last_scan_id_rx, client: client.clone(), diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { @@ -291,14 +289,12 @@ impl Worktree { cx.background() .spawn(async move { while let Some(update) = updates_rx.next().await { - if let BackgroundUpdate::Update(update) = update { - if let Err(error) = - background_snapshot.lock().apply_remote_update(update) - { - log::error!("error applying worktree update: {}", error); - } - snapshot_updated_tx.send(()).await.ok(); + if let Err(error) = + background_snapshot.lock().apply_remote_update(update) + { + log::error!("error applying worktree update: {}", error); } + snapshot_updated_tx.send(()).await.ok(); } }) .detach(); @@ -308,7 +304,11 @@ impl Worktree { async move { while let Some(_) = snapshot_updated_rx.recv().await { if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + let this = this.as_remote_mut().unwrap(); + *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id; + }); } else { break; } @@ -368,6 +368,13 @@ impl Worktree { } } + pub fn scan_id(&self) -> usize { + match self { + Worktree::Local(worktree) => worktree.snapshot.scan_id, + Worktree::Remote(worktree) => worktree.snapshot.scan_id, + } + } + pub fn is_visible(&self) -> bool { match self { Worktree::Local(worktree) => worktree.visible, @@ -465,7 +472,6 @@ impl LocalWorktree { let tree = cx.add_model(move |cx: &mut ModelContext| { let mut snapshot = LocalSnapshot { abs_path, - scan_id: 0, ignores: Default::default(), removed_entry_ids: Default::default(), next_entry_id, @@ -475,6 +481,7 @@ impl LocalWorktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), + scan_id: 0, }, }; if let Some(metadata) = metadata { @@ -505,24 +512,13 @@ impl LocalWorktree { cx.spawn_weak(|this, mut cx| async move { while let Some(scan_state) = scan_states_rx.next().await { - if let Some(handle) = this.upgrade(&cx) { - let to_send = handle.update(&mut cx, |this, cx| { - last_scan_state_tx.blocking_send(scan_state).ok(); + if let Some(this) = this.upgrade(&cx) { + last_scan_state_tx.blocking_send(scan_state).ok(); + this.update(&mut cx, |this, cx| { this.poll_snapshot(cx); - let tree = this.as_local_mut().unwrap(); - if !tree.is_scanning() { - if let Some(share) = tree.share.as_ref() { - return Some((tree.snapshot(), share.snapshots_tx.clone())); - } - } - None - }); - - if let Some((snapshot, snapshots_to_send_tx)) = to_send { - if let Err(err) = snapshots_to_send_tx.send(snapshot).await { - log::error!("error submitting snapshot to send {}", err); - } - } + this.as_local().unwrap().broadcast_snapshot() + }) + .await; } else { break; } @@ -745,7 +741,11 @@ impl LocalWorktree { let mut snapshot = this.background_snapshot.lock(); snapshot.delete_entry(entry_id); }); - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(()) })) } @@ -780,7 +780,11 @@ impl LocalWorktree { ) }) .await?; - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(entry) })) } @@ -814,7 +818,11 @@ impl LocalWorktree { .refresh_entry(path, abs_path, None) }) .await?; - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(entry) }) } @@ -923,6 +931,7 @@ impl LocalWorktree { .map(Into::into) .collect(), removed_entries: Default::default(), + scan_id: snapshot.scan_id as u64, }) .await { @@ -991,6 +1000,23 @@ impl LocalWorktree { pub fn is_shared(&self) -> bool { self.share.is_some() } + + fn broadcast_snapshot(&self) -> impl Future { + let mut to_send = None; + if !self.is_scanning() { + if let Some(share) = self.share.as_ref() { + to_send = Some((self.snapshot(), share.snapshots_tx.clone())); + } + } + + async move { + if let Some((snapshot, snapshots_to_send_tx)) = to_send { + if let Err(err) = snapshots_to_send_tx.send(snapshot).await { + log::error!("error submitting snapshot to send {}", err); + } + } + } + } } impl RemoteWorktree { @@ -1003,18 +1029,19 @@ impl RemoteWorktree { envelope: TypedEnvelope, ) -> Result<()> { self.updates_tx - .unbounded_send(BackgroundUpdate::Update(envelope.payload)) + .unbounded_send(envelope.payload) .expect("consumer runs to completion"); Ok(()) } - pub fn finish_pending_remote_updates(&self) -> impl Future { - let (tx, mut rx) = barrier::channel(); - self.updates_tx - .unbounded_send(BackgroundUpdate::Barrier(tx)) - .expect("consumer runs to completion"); + fn wait_for_snapshot(&self, scan_id: usize) -> impl Future { + let mut rx = self.last_scan_id_rx.clone(); async move { - rx.recv().await; + while let Some(applied_scan_id) = rx.next().await { + if applied_scan_id >= scan_id { + return; + } + } } } @@ -1038,16 +1065,12 @@ impl RemoteWorktree { pub fn insert_entry( &self, entry: proto::Entry, + scan_id: usize, cx: &mut ModelContext, ) -> Task> { + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .finish_pending_remote_updates() - }) - .await; + wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); let mut snapshot = worktree.background_snapshot.lock(); @@ -1061,16 +1084,12 @@ impl RemoteWorktree { pub(crate) fn delete_entry( &self, id: ProjectEntryId, + scan_id: usize, cx: &mut ModelContext, ) -> Task> { + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .finish_pending_remote_updates() - }) - .await; + wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); let mut snapshot = worktree.background_snapshot.lock(); @@ -1145,6 +1164,7 @@ impl Snapshot { self.entries_by_path.edit(entries_by_path_edits, &()); self.entries_by_id.edit(entries_by_id_edits, &()); + self.scan_id = update.scan_id as usize; Ok(()) } @@ -1233,6 +1253,10 @@ impl Snapshot { &self.root_name } + pub fn scan_id(&self) -> usize { + self.scan_id + } + pub fn entry_for_path(&self, path: impl AsRef) -> Option<&Entry> { let path = path.as_ref(); self.traverse_from_path(true, true, path) @@ -1282,6 +1306,7 @@ impl LocalSnapshot { .map(|(path, summary)| summary.to_proto(&path.0)) .collect(), visible, + scan_id: self.scan_id as u64, } } @@ -1347,6 +1372,7 @@ impl LocalSnapshot { root_name: self.root_name().to_string(), updated_entries, removed_entries, + scan_id: self.scan_id as u64, } } @@ -1390,11 +1416,18 @@ impl LocalSnapshot { entries: impl IntoIterator, ignore: Option>, ) { - let mut parent_entry = self - .entries_by_path - .get(&PathKey(parent_path.clone()), &()) - .unwrap() - .clone(); + let mut parent_entry = if let Some(parent_entry) = + self.entries_by_path.get(&PathKey(parent_path.clone()), &()) + { + parent_entry.clone() + } else { + log::warn!( + "populating a directory {:?} that has been removed", + parent_path + ); + return; + }; + if let Some(ignore) = ignore { self.ignores.insert(parent_path, (ignore, self.scan_id)); } @@ -1454,7 +1487,7 @@ impl LocalSnapshot { if path.file_name() == Some(&GITIGNORE) { if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) { - *scan_id = self.scan_id; + *scan_id = self.snapshot.scan_id; } } } @@ -2773,7 +2806,6 @@ mod tests { let next_entry_id = Arc::new(AtomicUsize::new(0)); let mut initial_snapshot = LocalSnapshot { abs_path: root_dir.path().into(), - scan_id: 0, removed_entry_ids: Default::default(), ignores: Default::default(), next_entry_id: next_entry_id.clone(), @@ -2783,6 +2815,7 @@ mod tests { entries_by_id: Default::default(), root_name: Default::default(), root_char_bag: Default::default(), + scan_id: 0, }, }; initial_snapshot.insert_entry( diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8a30278920ef5296181029e7b53fab4e1bc41a8d..fa0b587df486957d65b87c97caaf5b9a3eb5ed8f 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -162,6 +162,7 @@ message UpdateWorktree { string root_name = 3; repeated Entry updated_entries = 4; repeated uint64 removed_entries = 5; + uint64 scan_id = 6; } message CreateProjectEntry { @@ -184,6 +185,7 @@ message DeleteProjectEntry { message ProjectEntryResponse { Entry entry = 1; + uint64 worktree_scan_id = 2; } message AddProjectCollaborator { @@ -658,6 +660,7 @@ message Worktree { repeated Entry entries = 3; repeated DiagnosticSummary diagnostic_summaries = 4; bool visible = 5; + uint64 scan_id = 6; } message File { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 428eb13a42a9920ee309e90932cd72350033038c..c505869c554744e1c89911b0a9c5f556ac3dd8b0 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -225,7 +225,7 @@ request_messages!( ApplyCompletionAdditionalEditsResponse ), (CreateProjectEntry, ProjectEntryResponse), - (DeleteProjectEntry, Ack), + (DeleteProjectEntry, ProjectEntryResponse), (Follow, FollowResponse), (FormatBuffers, FormatBuffersResponse), (GetChannelMessages, GetChannelMessagesResponse),