From 73db910a9566d0040acc342cb730cbfeb7e65d86 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 5 Jul 2021 14:20:53 -0600 Subject: [PATCH] Process remote worktree updates in the background Co-Authored-By: Max Brunsfeld --- zed/src/worktree.rs | 66 ++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 96b34134dd1fca5d1b92df1f1818a0f1288b8d5a..b737b1b280b5b7d1701b031d384cb8e07d12e95e 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -154,7 +154,7 @@ impl Worktree { .await; let worktree = cx.update(|cx| { - cx.add_model(|cx| { + cx.add_model(|cx: &mut ModelContext| { let snapshot = Snapshot { id: cx.model_id(), scan_id: 0, @@ -168,10 +168,39 @@ impl Worktree { next_entry_id: Default::default(), }; + let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); + let (mut snapshot_tx, mut snapshot_rx) = + postage::watch::channel_with(snapshot.clone()); + + cx.background() + .spawn(async move { + while let Some(update) = updates_rx.recv().await { + let mut snapshot = snapshot_tx.borrow().clone(); + if let Err(error) = snapshot.apply_update(update) { + log::error!("error applying worktree update: {}", error); + } + *snapshot_tx.borrow_mut() = snapshot; + } + }) + .detach(); + + cx.spawn(|this, mut cx| async move { + while let Some(snapshot) = snapshot_rx.recv().await { + this.update(&mut cx, |this, cx| { + let this = this.as_remote_mut().unwrap(); + this.snapshot = snapshot; + cx.notify(); + this.update_open_buffers(cx); + }) + } + }) + .detach(); + Worktree::Remote(RemoteWorktree { remote_id: id, replica_id, snapshot, + updates_tx, rpc: rpc.clone(), open_buffers: Default::default(), peers: peers @@ -218,7 +247,7 @@ impl Worktree { pub fn snapshot(&self) -> Snapshot { match self { Worktree::Local(worktree) => worktree.snapshot(), - Worktree::Remote(worktree) => worktree.snapshot.clone(), + Worktree::Remote(worktree) => worktree.snapshot(), } } @@ -861,6 +890,7 @@ pub struct RemoteWorktree { remote_id: u64, snapshot: Snapshot, rpc: rpc::Client, + updates_tx: postage::mpsc::Sender, replica_id: ReplicaId, open_buffers: HashMap>, peers: HashMap, @@ -923,6 +953,10 @@ impl RemoteWorktree { }) } + fn snapshot(&self) -> Snapshot { + self.snapshot.clone() + } + pub fn add_peer( &mut self, envelope: TypedEnvelope, @@ -954,17 +988,6 @@ impl RemoteWorktree { Ok(()) } - pub fn update( - &mut self, - envelope: TypedEnvelope, - cx: &mut ModelContext, - ) -> Result<()> { - self.snapshot.apply_update(envelope.payload)?; - self.update_open_buffers(cx); - cx.notify(); - Ok(()) - } - fn update_open_buffers(&mut self, cx: &mut ModelContext) { let mut buffers_to_delete = Vec::new(); for (buffer_id, buffer) in &self.open_buffers { @@ -2305,17 +2328,22 @@ mod remote { .read() .await .shared_worktree(envelope.payload.worktree_id, cx)? - .update(cx, |worktree, cx| { + .update(cx, |worktree, _| { if let Some(worktree) = worktree.as_remote_mut() { - worktree.update(envelope, cx)?; + let mut tx = worktree.updates_tx.clone(); + Ok(async move { + tx.send(envelope.payload) + .await + .expect("receiver runs to completion"); + }) } else { - log::error!( + Err(anyhow!( "invalid update message for local worktree {}", envelope.payload.worktree_id - ); + )) } - Result::<_, anyhow::Error>::Ok(()) - })?; + })? + .await; Ok(()) }