diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 2367532d28a314bafab9bb284447493025f0401c..d17473e1398013b459cd94c8874ea375ec2c6d2b 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -448,6 +448,7 @@ impl Server { .cloned() .collect(), visible: worktree.visible, + scan_id: shared_worktree.scan_id, }) }) .collect(); @@ -578,6 +579,7 @@ impl Server { request.payload.worktree_id, &request.payload.removed_entries, &request.payload.updated_entries, + request.payload.scan_id, )?; broadcast(request.sender_id, connection_ids, |connection_id| { @@ -5804,6 +5806,7 @@ mod tests { guest_client.username, id ); + assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); } guest_client diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 3be072c5e2e67b84b7cf5f70c7367f2e4b3be4d6..4737dd2c804ded463841948413d404d09d0858c0 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -46,6 +46,7 @@ pub struct ProjectShare { pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, + pub scan_id: u64, } #[derive(Default)] @@ -561,6 +562,7 @@ impl Store { worktree_id: u64, removed_entries: &[u64], updated_entries: &[proto::Entry], + scan_id: u64, ) -> Result> { let project = self.write_project(project_id, connection_id)?; let worktree = project @@ -574,6 +576,7 @@ impl Store { for entry in updated_entries { worktree.entries.insert(entry.id, entry.clone()); } + worktree.scan_id = scan_id; let connection_ids = project.connection_ids(); Ok(connection_ids) } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7af1199ec107b9c10a1f12af592e929dea964ac8..1fcd89fcde90bee62b0a3831b4e2df3d491ed956 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -723,7 +723,11 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry(entry, cx) + worktree.as_remote().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -762,7 +766,11 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry(entry, cx) + worktree.as_remote().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -783,7 +791,7 @@ impl Project { let client = self.client.clone(); let project_id = self.remote_id().unwrap(); Some(cx.spawn_weak(|_, mut cx| async move { - client + let response = client .request(proto::DeleteProjectEntry { project_id, entry_id: entry_id.to_proto(), @@ -791,7 +799,11 @@ impl Project { .await?; worktree .update(&mut cx, move |worktree, cx| { - worktree.as_remote().unwrap().delete_entry(entry_id, cx) + worktree.as_remote().unwrap().delete_entry( + entry_id, + response.worktree_scan_id as usize, + cx, + ) }) .await })) @@ -3805,6 +3817,7 @@ impl Project { entries: Default::default(), diagnostic_summaries: Default::default(), visible: envelope.payload.visible, + scan_id: 0, }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); @@ -3851,21 +3864,22 @@ impl Project { _: Arc, mut cx: AsyncAppContext, ) -> Result { - let entry = this - .update(&mut cx, |this, cx| { - let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - let worktree = this - .worktree_for_id(worktree_id, cx) - .ok_or_else(|| anyhow!("worktree not found"))?; - worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - let path = PathBuf::from(OsString::from_vec(envelope.payload.path)); - anyhow::Ok(worktree.create_entry(path, envelope.payload.is_directory, cx)) - }) - })? + let worktree = this.update(&mut cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); + this.worktree_for_id(worktree_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + let entry = worktree + .update(&mut cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + let path = PathBuf::from(OsString::from_vec(envelope.payload.path)); + worktree.create_entry(path, envelope.payload.is_directory, cx) + }) .await?; Ok(proto::ProjectEntryResponse { entry: Some((&entry).into()), + worktree_scan_id: worktree_scan_id as u64, }) } @@ -3875,16 +3889,25 @@ impl Project { _: Arc, mut cx: AsyncAppContext, ) -> Result { - let entry = this - .update(&mut cx, |this, cx| { - let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let worktree = this.read_with(&cx, |this, cx| { + this.worktree_for_entry(entry_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + let entry = worktree + .update(&mut cx, |worktree, cx| { let new_path = PathBuf::from(OsString::from_vec(envelope.payload.new_path)); - this.rename_entry(entry_id, new_path, cx) + worktree + .as_local_mut() + .unwrap() + .rename_entry(entry_id, new_path, cx) .ok_or_else(|| anyhow!("invalid entry")) })? .await?; Ok(proto::ProjectEntryResponse { entry: Some((&entry).into()), + worktree_scan_id: worktree_scan_id as u64, }) } @@ -3893,14 +3916,26 @@ impl Project { envelope: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, - ) -> Result { - this.update(&mut cx, |this, cx| { - let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); - this.delete_entry(entry_id, cx) - .ok_or_else(|| anyhow!("invalid entry")) - })? - .await?; - Ok(proto::Ack {}) + ) -> Result { + let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); + let worktree = this.read_with(&cx, |this, cx| { + this.worktree_for_entry(entry_id, cx) + .ok_or_else(|| anyhow!("worktree not found")) + })?; + let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()); + worktree + .update(&mut cx, |worktree, cx| { + worktree + .as_local_mut() + .unwrap() + .delete_entry(entry_id, cx) + .ok_or_else(|| anyhow!("invalid entry")) + })? + .await?; + Ok(proto::ProjectEntryResponse { + entry: None, + worktree_scan_id: worktree_scan_id as u64, + }) } async fn handle_update_diagnostic_summary( diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 2a1808457cc61b35691a44c98973c99db00361f3..bab41bbe278e21330b1f695a09feb9375efc105f 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -29,7 +29,6 @@ use language::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ - barrier, prelude::{Sink as _, Stream as _}, watch, }; @@ -84,17 +83,13 @@ pub struct RemoteWorktree { pub(crate) background_snapshot: Arc>, project_id: u64, client: Arc, - updates_tx: UnboundedSender, + updates_tx: UnboundedSender, + last_scan_id_rx: watch::Receiver, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, } -enum BackgroundUpdate { - Update(proto::UpdateWorktree), - Barrier(barrier::Sender), -} - #[derive(Clone)] pub struct Snapshot { id: WorktreeId, @@ -102,12 +97,12 @@ pub struct Snapshot { root_char_bag: CharBag, entries_by_path: SumTree, entries_by_id: SumTree, + scan_id: usize, } #[derive(Clone)] pub struct LocalSnapshot { abs_path: Arc, - scan_id: usize, ignores: HashMap, (Arc, usize)>, removed_entry_ids: HashMap, next_entry_id: Arc, @@ -221,11 +216,13 @@ impl Worktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), + scan_id: worktree.scan_id as usize, }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); + let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { project_id: project_remote_id, @@ -233,6 +230,7 @@ impl Worktree { snapshot: snapshot.clone(), background_snapshot: background_snapshot.clone(), updates_tx, + last_scan_id_rx, client: client.clone(), diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { @@ -291,14 +289,12 @@ impl Worktree { cx.background() .spawn(async move { while let Some(update) = updates_rx.next().await { - if let BackgroundUpdate::Update(update) = update { - if let Err(error) = - background_snapshot.lock().apply_remote_update(update) - { - log::error!("error applying worktree update: {}", error); - } - snapshot_updated_tx.send(()).await.ok(); + if let Err(error) = + background_snapshot.lock().apply_remote_update(update) + { + log::error!("error applying worktree update: {}", error); } + snapshot_updated_tx.send(()).await.ok(); } }) .detach(); @@ -308,7 +304,11 @@ impl Worktree { async move { while let Some(_) = snapshot_updated_rx.recv().await { if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + let this = this.as_remote_mut().unwrap(); + *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id; + }); } else { break; } @@ -368,6 +368,13 @@ impl Worktree { } } + pub fn scan_id(&self) -> usize { + match self { + Worktree::Local(worktree) => worktree.snapshot.scan_id, + Worktree::Remote(worktree) => worktree.snapshot.scan_id, + } + } + pub fn is_visible(&self) -> bool { match self { Worktree::Local(worktree) => worktree.visible, @@ -465,7 +472,6 @@ impl LocalWorktree { let tree = cx.add_model(move |cx: &mut ModelContext| { let mut snapshot = LocalSnapshot { abs_path, - scan_id: 0, ignores: Default::default(), removed_entry_ids: Default::default(), next_entry_id, @@ -475,6 +481,7 @@ impl LocalWorktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), + scan_id: 0, }, }; if let Some(metadata) = metadata { @@ -505,24 +512,13 @@ impl LocalWorktree { cx.spawn_weak(|this, mut cx| async move { while let Some(scan_state) = scan_states_rx.next().await { - if let Some(handle) = this.upgrade(&cx) { - let to_send = handle.update(&mut cx, |this, cx| { - last_scan_state_tx.blocking_send(scan_state).ok(); + if let Some(this) = this.upgrade(&cx) { + last_scan_state_tx.blocking_send(scan_state).ok(); + this.update(&mut cx, |this, cx| { this.poll_snapshot(cx); - let tree = this.as_local_mut().unwrap(); - if !tree.is_scanning() { - if let Some(share) = tree.share.as_ref() { - return Some((tree.snapshot(), share.snapshots_tx.clone())); - } - } - None - }); - - if let Some((snapshot, snapshots_to_send_tx)) = to_send { - if let Err(err) = snapshots_to_send_tx.send(snapshot).await { - log::error!("error submitting snapshot to send {}", err); - } - } + this.as_local().unwrap().broadcast_snapshot() + }) + .await; } else { break; } @@ -745,7 +741,11 @@ impl LocalWorktree { let mut snapshot = this.background_snapshot.lock(); snapshot.delete_entry(entry_id); }); - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(()) })) } @@ -780,7 +780,11 @@ impl LocalWorktree { ) }) .await?; - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(entry) })) } @@ -814,7 +818,11 @@ impl LocalWorktree { .refresh_entry(path, abs_path, None) }) .await?; - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + this.as_local().unwrap().broadcast_snapshot() + }) + .await; Ok(entry) }) } @@ -923,6 +931,7 @@ impl LocalWorktree { .map(Into::into) .collect(), removed_entries: Default::default(), + scan_id: snapshot.scan_id as u64, }) .await { @@ -991,6 +1000,23 @@ impl LocalWorktree { pub fn is_shared(&self) -> bool { self.share.is_some() } + + fn broadcast_snapshot(&self) -> impl Future { + let mut to_send = None; + if !self.is_scanning() { + if let Some(share) = self.share.as_ref() { + to_send = Some((self.snapshot(), share.snapshots_tx.clone())); + } + } + + async move { + if let Some((snapshot, snapshots_to_send_tx)) = to_send { + if let Err(err) = snapshots_to_send_tx.send(snapshot).await { + log::error!("error submitting snapshot to send {}", err); + } + } + } + } } impl RemoteWorktree { @@ -1003,18 +1029,19 @@ impl RemoteWorktree { envelope: TypedEnvelope, ) -> Result<()> { self.updates_tx - .unbounded_send(BackgroundUpdate::Update(envelope.payload)) + .unbounded_send(envelope.payload) .expect("consumer runs to completion"); Ok(()) } - pub fn finish_pending_remote_updates(&self) -> impl Future { - let (tx, mut rx) = barrier::channel(); - self.updates_tx - .unbounded_send(BackgroundUpdate::Barrier(tx)) - .expect("consumer runs to completion"); + fn wait_for_snapshot(&self, scan_id: usize) -> impl Future { + let mut rx = self.last_scan_id_rx.clone(); async move { - rx.recv().await; + while let Some(applied_scan_id) = rx.next().await { + if applied_scan_id >= scan_id { + return; + } + } } } @@ -1038,16 +1065,12 @@ impl RemoteWorktree { pub fn insert_entry( &self, entry: proto::Entry, + scan_id: usize, cx: &mut ModelContext, ) -> Task> { + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .finish_pending_remote_updates() - }) - .await; + wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); let mut snapshot = worktree.background_snapshot.lock(); @@ -1061,16 +1084,12 @@ impl RemoteWorktree { pub(crate) fn delete_entry( &self, id: ProjectEntryId, + scan_id: usize, cx: &mut ModelContext, ) -> Task> { + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .finish_pending_remote_updates() - }) - .await; + wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); let mut snapshot = worktree.background_snapshot.lock(); @@ -1145,6 +1164,7 @@ impl Snapshot { self.entries_by_path.edit(entries_by_path_edits, &()); self.entries_by_id.edit(entries_by_id_edits, &()); + self.scan_id = update.scan_id as usize; Ok(()) } @@ -1233,6 +1253,10 @@ impl Snapshot { &self.root_name } + pub fn scan_id(&self) -> usize { + self.scan_id + } + pub fn entry_for_path(&self, path: impl AsRef) -> Option<&Entry> { let path = path.as_ref(); self.traverse_from_path(true, true, path) @@ -1282,6 +1306,7 @@ impl LocalSnapshot { .map(|(path, summary)| summary.to_proto(&path.0)) .collect(), visible, + scan_id: self.scan_id as u64, } } @@ -1347,6 +1372,7 @@ impl LocalSnapshot { root_name: self.root_name().to_string(), updated_entries, removed_entries, + scan_id: self.scan_id as u64, } } @@ -1390,11 +1416,18 @@ impl LocalSnapshot { entries: impl IntoIterator, ignore: Option>, ) { - let mut parent_entry = self - .entries_by_path - .get(&PathKey(parent_path.clone()), &()) - .unwrap() - .clone(); + let mut parent_entry = if let Some(parent_entry) = + self.entries_by_path.get(&PathKey(parent_path.clone()), &()) + { + parent_entry.clone() + } else { + log::warn!( + "populating a directory {:?} that has been removed", + parent_path + ); + return; + }; + if let Some(ignore) = ignore { self.ignores.insert(parent_path, (ignore, self.scan_id)); } @@ -1454,7 +1487,7 @@ impl LocalSnapshot { if path.file_name() == Some(&GITIGNORE) { if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) { - *scan_id = self.scan_id; + *scan_id = self.snapshot.scan_id; } } } @@ -2773,7 +2806,6 @@ mod tests { let next_entry_id = Arc::new(AtomicUsize::new(0)); let mut initial_snapshot = LocalSnapshot { abs_path: root_dir.path().into(), - scan_id: 0, removed_entry_ids: Default::default(), ignores: Default::default(), next_entry_id: next_entry_id.clone(), @@ -2783,6 +2815,7 @@ mod tests { entries_by_id: Default::default(), root_name: Default::default(), root_char_bag: Default::default(), + scan_id: 0, }, }; initial_snapshot.insert_entry( diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8a30278920ef5296181029e7b53fab4e1bc41a8d..fa0b587df486957d65b87c97caaf5b9a3eb5ed8f 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -162,6 +162,7 @@ message UpdateWorktree { string root_name = 3; repeated Entry updated_entries = 4; repeated uint64 removed_entries = 5; + uint64 scan_id = 6; } message CreateProjectEntry { @@ -184,6 +185,7 @@ message DeleteProjectEntry { message ProjectEntryResponse { Entry entry = 1; + uint64 worktree_scan_id = 2; } message AddProjectCollaborator { @@ -658,6 +660,7 @@ message Worktree { repeated Entry entries = 3; repeated DiagnosticSummary diagnostic_summaries = 4; bool visible = 5; + uint64 scan_id = 6; } message File { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 428eb13a42a9920ee309e90932cd72350033038c..c505869c554744e1c89911b0a9c5f556ac3dd8b0 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -225,7 +225,7 @@ request_messages!( ApplyCompletionAdditionalEditsResponse ), (CreateProjectEntry, ProjectEntryResponse), - (DeleteProjectEntry, Ack), + (DeleteProjectEntry, ProjectEntryResponse), (Follow, FollowResponse), (FormatBuffers, FormatBuffersResponse), (GetChannelMessages, GetChannelMessagesResponse),