diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 806498a22402fb6c8b254b6c673600871250b39a..ec274330490795078f38f59db3dd0d922037e5af 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1081,7 +1081,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1124,7 +1124,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1167,7 +1167,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1200,7 +1200,7 @@ impl Project { .await?; worktree .update(&mut cx, move |worktree, cx| { - worktree.as_remote().unwrap().delete_entry( + worktree.as_remote_mut().unwrap().delete_entry( entry_id, response.worktree_scan_id as usize, cx, diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 217a512c71133ba9f401d8cbacbc74deeae9bff2..5eb3c3dbd668f36eb1dea1e16c40fa9980ab7bcf 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -9,7 +9,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; use client::{proto, Client}; use clock::ReplicaId; -use collections::HashMap; +use collections::{HashMap, VecDeque}; use futures::{ channel::{ mpsc::{self, UnboundedSender}, @@ -82,7 +82,7 @@ pub struct RemoteWorktree { project_id: u64, client: Arc, updates_tx: Option>, - snapshot_updated_rx: watch::Receiver<()>, + snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, @@ -204,7 +204,7 @@ impl Worktree { snapshot: snapshot.clone(), background_snapshot: background_snapshot.clone(), updates_tx: Some(updates_tx), - snapshot_updated_rx: snapshot_updated_rx.clone(), + snapshot_subscriptions: Default::default(), client: client.clone(), diagnostic_summaries: Default::default(), visible, @@ -227,7 +227,18 @@ impl Worktree { async move { while let Some(_) = snapshot_updated_rx.recv().await { if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + let this = this.as_remote_mut().unwrap(); + while let Some((scan_id, _)) = this.snapshot_subscriptions.front() { + if this.observed_snapshot(*scan_id) { + let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap(); + let _ = tx.send(()); + } else { + break; + } + } + }); } else { break; } @@ -969,25 +980,26 @@ impl RemoteWorktree { } } - fn wait_for_snapshot( - &self, - scan_id: usize, - cx: &mut ModelContext, - ) -> Task> { - let mut rx = self.snapshot_updated_rx.clone(); - cx.spawn_weak(|worktree, cx| async move { - while rx.recv().await.is_some() { - let snapshot = worktree - .upgrade(&cx)? - .read_with(&cx, |worktree, _| worktree.snapshot()); - if snapshot.scan_id > scan_id - || (snapshot.scan_id == scan_id && snapshot.is_complete) - { - break; - } + fn observed_snapshot(&self, scan_id: usize) -> bool { + self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete) + } + + fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future { + let (tx, rx) = oneshot::channel(); + if self.observed_snapshot(scan_id) { + let _ = tx.send(()); + } else { + match self + .snapshot_subscriptions + .binary_search_by_key(&scan_id, |probe| probe.0) + { + Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)), } - None - }) + } + + async move { + let _ = rx.await; + } } pub fn update_diagnostic_summary( @@ -1009,12 +1021,12 @@ impl RemoteWorktree { } pub fn insert_entry( - &self, + &mut self, entry: proto::Entry, scan_id: usize, cx: &mut ModelContext, ) -> Task> { - let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx); + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { @@ -1028,12 +1040,12 @@ impl RemoteWorktree { } pub(crate) fn delete_entry( - &self, + &mut self, id: ProjectEntryId, scan_id: usize, cx: &mut ModelContext, ) -> Task> { - let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx); + let wait_for_snapshot = self.wait_for_snapshot(scan_id); cx.spawn(|this, mut cx| async move { wait_for_snapshot.await; this.update(&mut cx, |worktree, _| {