diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index eff003479f8c451e32f0e82c5ccc58b5aec8709a..e9cbfa808a84646fda7cea61f58fb7699a67cad6 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2134,9 +2134,9 @@ impl Project { this.update(&mut cx, |this, cx| { let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { - worktree.update(cx, |worktree, cx| { + worktree.update(cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); - worktree.update_from_remote(envelope, cx) + worktree.update_from_remote(envelope) })?; } Ok(()) @@ -3419,7 +3419,7 @@ mod tests { .await; // Create a remote copy of this worktree. - let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot()); + let initial_snapshot = tree.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot()); let (remote, load_task) = cx.update(|cx| { Worktree::remote( 1, @@ -3495,10 +3495,13 @@ mod tests { // Update the remote worktree. Check that it becomes consistent with the // local worktree. remote.update(&mut cx, |remote, cx| { - let update_message = - tree.read(cx) - .snapshot() - .build_update(&initial_snapshot, 1, 1, true); + let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update( + &initial_snapshot, + 1, + 1, + 0, + true, + ); remote .as_remote_mut() .unwrap() diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 79e3a7e528b0b310978d46cb26f64f5bf2be546d..675f6503424b884f69bd85305aae9b8f9d6dbc77 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,8 +7,11 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Result}; use client::{proto, Client, TypedEnvelope}; use clock::ReplicaId; -use collections::HashMap; -use futures::{Stream, StreamExt}; +use collections::{HashMap, VecDeque}; +use futures::{ + channel::mpsc::{self, UnboundedSender}, + Stream, StreamExt, +}; use fuzzy::CharBag; use gpui::{ executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, @@ -39,7 +42,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap}; -use util::ResultExt; +use util::{post_inc, ResultExt}; lazy_static! { static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore"); @@ -75,11 +78,13 @@ pub struct RemoteWorktree { project_id: u64, snapshot_rx: watch::Receiver, client: Arc, - updates_tx: postage::mpsc::Sender, + updates_tx: UnboundedSender, replica_id: ReplicaId, queued_operations: Vec<(u64, Operation)>, diagnostic_summaries: TreeMap, weak: bool, + next_update_id: u64, + pending_updates: VecDeque, } #[derive(Clone)] @@ -208,7 +213,7 @@ impl Worktree { entries_by_id: Default::default(), }; - let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); + let (updates_tx, mut updates_rx) = mpsc::unbounded(); let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { @@ -233,6 +238,8 @@ impl Worktree { }), ), weak, + next_update_id: worktree.next_update_id, + pending_updates: Default::default(), }) }); @@ -276,7 +283,7 @@ impl Worktree { cx.background() .spawn(async move { - while let Some(update) = updates_rx.recv().await { + while let Some(update) = updates_rx.next().await { let mut snapshot = snapshot_tx.borrow().clone(); if let Err(error) = snapshot.apply_remote_update(update) { log::error!("error applying worktree update: {}", error); @@ -450,7 +457,7 @@ impl LocalWorktree { weak: bool, fs: Arc, cx: &mut AsyncAppContext, - ) -> Result<(ModelHandle, Sender)> { + ) -> Result<(ModelHandle, UnboundedSender)> { let abs_path = path.into(); let path: Arc = Arc::from(Path::new("")); let next_entry_id = AtomicUsize::new(0); @@ -470,7 +477,7 @@ impl LocalWorktree { } } - let (scan_states_tx, scan_states_rx) = smol::channel::unbounded(); + let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning); let tree = cx.add_model(move |cx: &mut ModelContext| { let mut snapshot = LocalSnapshot { @@ -515,7 +522,7 @@ impl LocalWorktree { }; cx.spawn_weak(|this, mut cx| async move { - while let Ok(scan_state) = scan_states_rx.recv().await { + while let Some(scan_state) = scan_states_rx.next().await { if let Some(handle) = this.upgrade(&cx) { let to_send = handle.update(&mut cx, |this, cx| { last_scan_state_tx.blocking_send(scan_state).ok(); @@ -765,10 +772,16 @@ impl LocalWorktree { let rpc = rpc.clone(); let snapshot = snapshot.clone(); async move { + let mut update_id = 0; let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); + let message = snapshot.build_update( + &prev_snapshot, + project_id, + worktree_id, + post_inc(&mut update_id), + false, + ); match rpc.send(message) { Ok(()) => prev_snapshot = snapshot, Err(err) => log::error!("error sending snapshot diff {}", err), @@ -814,15 +827,31 @@ impl RemoteWorktree { pub fn update_from_remote( &mut self, envelope: TypedEnvelope, - cx: &mut ModelContext, ) -> Result<()> { - let mut tx = self.updates_tx.clone(); - let payload = envelope.payload.clone(); - cx.foreground() - .spawn(async move { - tx.send(payload).await.expect("receiver runs to completion"); - }) - .detach(); + let update = envelope.payload; + if update.id > self.next_update_id { + let ix = match self + .pending_updates + .binary_search_by_key(&update.id, |pending| pending.id) + { + Ok(ix) | Err(ix) => ix, + }; + self.pending_updates.insert(ix, update); + } else { + let tx = self.updates_tx.clone(); + self.next_update_id += 1; + tx.unbounded_send(update) + .expect("consumer runs to completion"); + while let Some(update) = self.pending_updates.front() { + if update.id == self.next_update_id { + self.next_update_id += 1; + tx.unbounded_send(self.pending_updates.pop_front().unwrap()) + .expect("consumer runs to completion"); + } else { + break; + } + } + } Ok(()) } @@ -849,94 +878,6 @@ impl Snapshot { self.id } - pub(crate) fn to_proto( - &self, - diagnostic_summaries: &TreeMap, - weak: bool, - ) -> proto::Worktree { - let root_name = self.root_name.clone(); - proto::Worktree { - id: self.id.0 as u64, - root_name, - entries: self - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - diagnostic_summaries: diagnostic_summaries - .iter() - .map(|(path, summary)| summary.to_proto(path.0.clone())) - .collect(), - weak, - } - } - - pub(crate) fn build_update( - &self, - other: &Self, - project_id: u64, - worktree_id: u64, - include_ignored: bool, - ) -> proto::UpdateWorktree { - let mut updated_entries = Vec::new(); - let mut removed_entries = Vec::new(); - let mut self_entries = self - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - let mut other_entries = other - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - loop { - match (self_entries.peek(), other_entries.peek()) { - (Some(self_entry), Some(other_entry)) => { - match Ord::cmp(&self_entry.id, &other_entry.id) { - Ordering::Less => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - Ordering::Equal => { - if self_entry.scan_id != other_entry.scan_id { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - } - - self_entries.next(); - other_entries.next(); - } - Ordering::Greater => { - removed_entries.push(other_entry.id as u64); - other_entries.next(); - } - } - } - (Some(self_entry), None) => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - (None, Some(other_entry)) => { - removed_entries.push(other_entry.id as u64); - other_entries.next(); - } - (None, None) => break, - } - } - - proto::UpdateWorktree { - project_id, - worktree_id, - root_name: self.root_name().to_string(), - updated_entries, - removed_entries, - } - } - pub(crate) fn apply_remote_update(&mut self, update: proto::UpdateWorktree) -> Result<()> { let mut entries_by_path_edits = Vec::new(); let mut entries_by_id_edits = Vec::new(); @@ -1077,6 +1018,97 @@ impl Snapshot { } impl LocalSnapshot { + pub(crate) fn to_proto( + &self, + diagnostic_summaries: &TreeMap, + weak: bool, + ) -> proto::Worktree { + let root_name = self.root_name.clone(); + proto::Worktree { + id: self.id.0 as u64, + root_name, + entries: self + .entries_by_path + .iter() + .filter(|e| !e.is_ignored) + .map(Into::into) + .collect(), + diagnostic_summaries: diagnostic_summaries + .iter() + .map(|(path, summary)| summary.to_proto(path.0.clone())) + .collect(), + weak, + next_update_id: 0, + } + } + + pub(crate) fn build_update( + &self, + other: &Self, + project_id: u64, + worktree_id: u64, + update_id: u64, + include_ignored: bool, + ) -> proto::UpdateWorktree { + let mut updated_entries = Vec::new(); + let mut removed_entries = Vec::new(); + let mut self_entries = self + .entries_by_id + .cursor::<()>() + .filter(|e| include_ignored || !e.is_ignored) + .peekable(); + let mut other_entries = other + .entries_by_id + .cursor::<()>() + .filter(|e| include_ignored || !e.is_ignored) + .peekable(); + loop { + match (self_entries.peek(), other_entries.peek()) { + (Some(self_entry), Some(other_entry)) => { + match Ord::cmp(&self_entry.id, &other_entry.id) { + Ordering::Less => { + let entry = self.entry_for_id(self_entry.id).unwrap().into(); + updated_entries.push(entry); + self_entries.next(); + } + Ordering::Equal => { + if self_entry.scan_id != other_entry.scan_id { + let entry = self.entry_for_id(self_entry.id).unwrap().into(); + updated_entries.push(entry); + } + + self_entries.next(); + other_entries.next(); + } + Ordering::Greater => { + removed_entries.push(other_entry.id as u64); + other_entries.next(); + } + } + } + (Some(self_entry), None) => { + let entry = self.entry_for_id(self_entry.id).unwrap().into(); + updated_entries.push(entry); + self_entries.next(); + } + (None, Some(other_entry)) => { + removed_entries.push(other_entry.id as u64); + other_entries.next(); + } + (None, None) => break, + } + } + + proto::UpdateWorktree { + id: update_id as u64, + project_id, + worktree_id, + root_name: self.root_name().to_string(), + updated_entries, + removed_entries, + } + } + fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry { if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) { let abs_path = self.abs_path.join(&entry.path); @@ -1668,14 +1700,14 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { struct BackgroundScanner { fs: Arc, snapshot: Arc>, - notify: Sender, + notify: UnboundedSender, executor: Arc, } impl BackgroundScanner { fn new( snapshot: Arc>, - notify: Sender, + notify: UnboundedSender, fs: Arc, executor: Arc, ) -> Self { @@ -1696,28 +1728,27 @@ impl BackgroundScanner { } async fn run(mut self, events_rx: impl Stream>) { - if self.notify.send(ScanState::Scanning).await.is_err() { + if self.notify.unbounded_send(ScanState::Scanning).is_err() { return; } if let Err(err) = self.scan_dirs().await { if self .notify - .send(ScanState::Err(Arc::new(err))) - .await + .unbounded_send(ScanState::Err(Arc::new(err))) .is_err() { return; } } - if self.notify.send(ScanState::Idle).await.is_err() { + if self.notify.unbounded_send(ScanState::Idle).is_err() { return; } futures::pin_mut!(events_rx); while let Some(events) = events_rx.next().await { - if self.notify.send(ScanState::Scanning).await.is_err() { + if self.notify.unbounded_send(ScanState::Scanning).is_err() { break; } @@ -1725,7 +1756,7 @@ impl BackgroundScanner { break; } - if self.notify.send(ScanState::Idle).await.is_err() { + if self.notify.unbounded_send(ScanState::Idle).is_err() { break; } } @@ -2503,7 +2534,7 @@ mod tests { } log::info!("Generated initial tree"); - let (notify_tx, _notify_rx) = smol::channel::unbounded(); + let (notify_tx, _notify_rx) = mpsc::unbounded(); let fs = Arc::new(RealFs); let next_entry_id = Arc::new(AtomicUsize::new(0)); let mut initial_snapshot = LocalSnapshot { @@ -2563,7 +2594,7 @@ mod tests { smol::block_on(scanner.process_events(events)); scanner.snapshot().check_invariants(); - let (notify_tx, _notify_rx) = smol::channel::unbounded(); + let (notify_tx, _notify_rx) = mpsc::unbounded(); let mut new_scanner = BackgroundScanner::new( Arc::new(Mutex::new(initial_snapshot)), notify_tx, @@ -2576,6 +2607,7 @@ mod tests { new_scanner.snapshot().to_vec(true) ); + let mut update_id = 0; for mut prev_snapshot in snapshots { let include_ignored = rng.gen::(); if !include_ignored { @@ -2596,9 +2628,13 @@ mod tests { prev_snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - let update = scanner - .snapshot() - .build_update(&prev_snapshot, 0, 0, include_ignored); + let update = scanner.snapshot().build_update( + &prev_snapshot, + 0, + 0, + post_inc(&mut update_id), + include_ignored, + ); prev_snapshot.apply_remote_update(update).unwrap(); assert_eq!( prev_snapshot.to_vec(true), diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cc28e507e97ab517f632bd4b303c082bf0722708..5803b24c273722f5f4adcced70e260673152dbdc 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -128,11 +128,12 @@ message ShareWorktree { } message UpdateWorktree { - uint64 project_id = 1; - uint64 worktree_id = 2; - string root_name = 3; - repeated Entry updated_entries = 4; - repeated uint64 removed_entries = 5; + uint64 id = 1; + uint64 project_id = 2; + uint64 worktree_id = 3; + string root_name = 4; + repeated Entry updated_entries = 5; + repeated uint64 removed_entries = 6; } message AddProjectCollaborator { @@ -386,6 +387,7 @@ message Worktree { repeated Entry entries = 3; repeated DiagnosticSummary diagnostic_summaries = 4; bool weak = 5; + uint64 next_update_id = 6; } message File { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index cca6d6a4fa5ae115fa8268ac900ca8e01f51c801..7ba2420f01e532e23ac2a84f1df93e1d12212cdc 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -339,6 +339,7 @@ impl Server { .cloned() .collect(), weak: worktree.weak, + next_update_id: share.next_update_id as u64, }) }) .collect(); @@ -477,6 +478,7 @@ impl Server { request.sender_id, entries, diagnostic_summaries, + worktree.next_update_id, )?; broadcast( @@ -1115,7 +1117,7 @@ mod tests { LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, }, lsp, - project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath}, + project::{DiagnosticSummary, Project, ProjectPath}, workspace::{Workspace, WorkspaceParams}, }; @@ -1486,11 +1488,6 @@ mod tests { buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty())); buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await; - // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise - // when interpreting the change event it will mistakenly think that the file has been - // deleted (because its path has changed) and will subsequently fail to detect the rename. - worktree_a.flush_fs_events(&cx_a).await; - // Make changes on host's file system, see those changes on guest worktrees. fs.rename( "/a/file1".as_ref(), @@ -1499,6 +1496,7 @@ mod tests { ) .await .unwrap(); + fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default()) .await .unwrap(); diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 5cb0a0e1db028631c468b1ab3519114483a12f14..1cfd7024191ed6e3cb60e3b8224fef8ee18bb83f 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -43,6 +43,7 @@ pub struct ProjectShare { pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, + pub next_update_id: u64, } #[derive(Default)] @@ -403,6 +404,7 @@ impl Store { connection_id: ConnectionId, entries: HashMap, diagnostic_summaries: BTreeMap, + next_update_id: u64, ) -> tide::Result { let project = self .projects @@ -416,6 +418,7 @@ impl Store { worktree.share = Some(WorktreeShare { entries, diagnostic_summaries, + next_update_id, }); Ok(SharedWorktree { authorized_user_ids: project.authorized_user_ids(), diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs index b0c66b005bd0e0deb39962ac37eb5292d63d21bf..919fecf8f9c0097cfceafb4b3a6bfe98e3f91afa 100644 --- a/crates/util/src/lib.rs +++ b/crates/util/src/lib.rs @@ -4,13 +4,14 @@ pub mod test; use futures::Future; use std::{ cmp::Ordering, + ops::AddAssign, pin::Pin, task::{Context, Poll}, }; -pub fn post_inc(value: &mut usize) -> usize { +pub fn post_inc + AddAssign + Copy>(value: &mut T) -> T { let prev = *value; - *value += 1; + *value += T::from(1); prev }