From a762f575f4cd5bc53e0a0172d5c4b6d4a57eef59 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 14:00:38 +0100 Subject: [PATCH] Add remote worktree to project before it is fully deserialized This prevents a race condition where the host will send us messages and responses about a worktree that we have seen but haven't yet finished loading. --- crates/project/src/project.rs | 38 +++++----- crates/project/src/worktree.rs | 124 +++++++++++++++++---------------- 2 files changed, 82 insertions(+), 80 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 761a9ca8ae0bb603e115d8872ef988b8ddc532d4..b24fa4caa8d97d66167fc5cb2120f2a17d99837e 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -247,8 +247,10 @@ impl Project { let mut worktrees = Vec::new(); for worktree in response.worktrees { - worktrees - .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?); + let (worktree, load_task) = cx + .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)); + worktrees.push(worktree); + load_task.detach(); } let user_ids = response @@ -1464,16 +1466,9 @@ impl Project { .payload .worktree .ok_or_else(|| anyhow!("invalid worktree"))?; - cx.spawn(|this, mut cx| { - async move { - let worktree = - Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?; - this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx)); - Ok(()) - } - .log_err() - }) - .detach(); + let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); + self.add_worktree(&worktree, cx); + load_task.detach(); Ok(()) } @@ -2551,15 +2546,16 @@ mod tests { // Create a remote copy of this worktree. let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot()); - let remote = Worktree::remote( - 1, - 1, - initial_snapshot.to_proto(&Default::default(), Default::default()), - rpc.clone(), - &mut cx.to_async(), - ) - .await - .unwrap(); + let (remote, load_task) = cx.update(|cx| { + Worktree::remote( + 1, + 1, + initial_snapshot.to_proto(&Default::default(), Default::default()), + rpc.clone(), + cx, + ) + }); + load_task.await; cx.read(|cx| { assert!(!buffer2.read(cx).is_dirty()); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index fd33386ffcc5793c2572ba7815d8c370ef19c822..3aba73c416777ccfa6c5f5e1aa49c98783b57a11 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -190,13 +190,13 @@ impl Worktree { Ok(tree) } - pub async fn remote( + pub fn remote( project_remote_id: u64, replica_id: ReplicaId, worktree: proto::Worktree, client: Arc, - cx: &mut AsyncAppContext, - ) -> Result> { + cx: &mut MutableAppContext, + ) -> (ModelHandle, Task<()>) { let remote_id = worktree.id; let root_char_bag: CharBag = worktree .root_name @@ -205,32 +205,26 @@ impl Worktree { .collect(); let root_name = worktree.root_name.clone(); let weak = worktree.weak; - let (entries_by_path, entries_by_id, diagnostic_summaries) = cx - .background() - .spawn(async move { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in worktree.entries { - match Entry::try_from((&root_char_bag, entry)) { - Ok(entry) => { - entries_by_id_edits.push(Edit::Insert(PathEntry { - id: entry.id, - path: entry.path.clone(), - is_ignored: entry.is_ignored, - scan_id: 0, - })); - entries_by_path_edits.push(Edit::Insert(entry)); - } - Err(err) => log::warn!("error for remote worktree entry {:?}", err), - } - } - - let mut entries_by_path = SumTree::new(); - let mut entries_by_id = SumTree::new(); - entries_by_path.edit(entries_by_path_edits, &()); - entries_by_id.edit(entries_by_id_edits, &()); + let snapshot = Snapshot { + id: WorktreeId(remote_id as usize), + root_name, + root_char_bag, + entries_by_path: Default::default(), + entries_by_id: Default::default(), + }; - let diagnostic_summaries = TreeMap::from_ordered_entries( + let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); + let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let worktree_handle = cx.add_model(|_: &mut ModelContext| { + Worktree::Remote(RemoteWorktree { + project_id: project_remote_id, + replica_id, + snapshot: snapshot.clone(), + snapshot_rx: snapshot_rx.clone(), + updates_tx, + client: client.clone(), + queued_operations: Default::default(), + diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { ( PathKey(PathBuf::from(summary.path).into()), @@ -242,24 +236,48 @@ impl Worktree { }, ) }), - ); - - (entries_by_path, entries_by_id, diagnostic_summaries) + ), + weak, }) - .await; + }); - let worktree = cx.update(|cx| { - cx.add_model(|cx: &mut ModelContext| { - let snapshot = Snapshot { - id: WorktreeId(remote_id as usize), - root_name, - root_char_bag, - entries_by_path, - entries_by_id, - }; + let deserialize_task = cx.spawn({ + let worktree_handle = worktree_handle.clone(); + |cx| async move { + let (entries_by_path, entries_by_id) = cx + .background() + .spawn(async move { + let mut entries_by_path_edits = Vec::new(); + let mut entries_by_id_edits = Vec::new(); + for entry in worktree.entries { + match Entry::try_from((&root_char_bag, entry)) { + Ok(entry) => { + entries_by_id_edits.push(Edit::Insert(PathEntry { + id: entry.id, + path: entry.path.clone(), + is_ignored: entry.is_ignored, + scan_id: 0, + })); + entries_by_path_edits.push(Edit::Insert(entry)); + } + Err(err) => log::warn!("error for remote worktree entry {:?}", err), + } + } - let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); - let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let mut entries_by_path = SumTree::new(); + let mut entries_by_id = SumTree::new(); + entries_by_path.edit(entries_by_path_edits, &()); + entries_by_id.edit(entries_by_id_edits, &()); + + (entries_by_path, entries_by_id) + }) + .await; + + { + let mut snapshot = snapshot_tx.borrow_mut(); + snapshot.entries_by_path = entries_by_path; + snapshot.entries_by_id = entries_by_id; + } cx.background() .spawn(async move { @@ -275,7 +293,8 @@ impl Worktree { { let mut snapshot_rx = snapshot_rx.clone(); - cx.spawn_weak(|this, mut cx| async move { + let this = worktree_handle.downgrade(); + cx.spawn(|mut cx| async move { while let Some(_) = snapshot_rx.recv().await { if let Some(this) = cx.read(|cx| this.upgrade(cx)) { this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); @@ -286,22 +305,9 @@ impl Worktree { }) .detach(); } - - Worktree::Remote(RemoteWorktree { - project_id: project_remote_id, - replica_id, - snapshot, - snapshot_rx, - updates_tx, - client: client.clone(), - queued_operations: Default::default(), - diagnostic_summaries, - weak, - }) - }) + } }); - - Ok(worktree) + (worktree_handle, deserialize_task) } pub fn as_local(&self) -> Option<&LocalWorktree> {