From 53adaa77c196efb11bd242dad6684644bebe9d15 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 5 Jul 2021 17:20:26 +0200 Subject: [PATCH] Propagate file system changes to remote worktrees --- zed-rpc/proto/zed.proto | 25 ++-- zed-rpc/src/proto.rs | 1 + zed/src/worktree.rs | 318 +++++++++++++++++++++++++++++++++------- 3 files changed, 283 insertions(+), 61 deletions(-) diff --git a/zed-rpc/proto/zed.proto b/zed-rpc/proto/zed.proto index 4a505d53d56c36cd01ddf77cc5dc682ea8d8c77b..74e4cef2f4ea0d54d263cdaf4f2fbb0fe6eccd95 100644 --- a/zed-rpc/proto/zed.proto +++ b/zed-rpc/proto/zed.proto @@ -12,15 +12,16 @@ message Envelope { ShareWorktreeResponse share_worktree_response = 7; OpenWorktree open_worktree = 8; OpenWorktreeResponse open_worktree_response = 9; - CloseWorktree close_worktree = 10; - OpenBuffer open_buffer = 11; - OpenBufferResponse open_buffer_response = 12; - CloseBuffer close_buffer = 13; - UpdateBuffer update_buffer = 14; - SaveBuffer save_buffer = 15; - BufferSaved buffer_saved = 16; - AddPeer add_peer = 17; - RemovePeer remove_peer = 18; + UpdateWorktree update_worktree = 10; + CloseWorktree close_worktree = 11; + OpenBuffer open_buffer = 12; + OpenBufferResponse open_buffer_response = 13; + CloseBuffer close_buffer = 14; + UpdateBuffer update_buffer = 15; + SaveBuffer save_buffer = 16; + BufferSaved buffer_saved = 17; + AddPeer add_peer = 18; + RemovePeer remove_peer = 19; } } @@ -53,6 +54,12 @@ message OpenWorktreeResponse { repeated Peer peers = 3; } +message UpdateWorktree { + uint64 worktree_id = 1; + repeated Entry updated_entries = 2; + repeated uint64 removed_entries = 3; +} + message CloseWorktree { uint64 worktree_id = 1; } diff --git a/zed-rpc/src/proto.rs b/zed-rpc/src/proto.rs index 6eb34b26d8e260f6cf869c74f79a99d4d6ce4272..1626e5aad53ccadb182771aa0a2e877072bcb315 100644 --- a/zed-rpc/src/proto.rs +++ b/zed-rpc/src/proto.rs @@ -71,6 +71,7 @@ macro_rules! request_message { request_message!(Auth, AuthResponse); request_message!(ShareWorktree, ShareWorktreeResponse); request_message!(OpenWorktree, OpenWorktreeResponse); +message!(UpdateWorktree); message!(CloseWorktree); request_message!(OpenBuffer, OpenBufferResponse); message!(CloseBuffer); diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 9a16975598d2f9a6059d4b4c9e935d66d78f553d..65defff88b28439df18dac81a27522bd5db5583a 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -30,9 +30,9 @@ use smol::{ io::{AsyncReadExt, AsyncWriteExt}, }; use std::{ - cmp, + cmp::{self, Ordering}, collections::HashMap, - convert::TryInto, + convert::{TryFrom, TryInto}, ffi::{OsStr, OsString}, fmt, fs, future::Future, @@ -55,6 +55,7 @@ lazy_static! { pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { rpc.on_message(remote::add_peer, cx); rpc.on_message(remote::remove_peer, cx); + rpc.on_message(remote::update_worktree, cx); rpc.on_message(remote::open_buffer, cx); rpc.on_message(remote::close_buffer, cx); rpc.on_message(remote::update_buffer, cx); @@ -135,35 +136,19 @@ impl Worktree { let (entries, paths_by_id) = cx .background() .spawn(async move { - let mut entries = SumTree::new(); - let mut paths_by_id = rpds::HashTrieMapSync::default(); + let mut paths_by_id = rpds::RedBlackTreeMapSync::default(); + let mut edits = Vec::new(); for entry in worktree_message.entries { - if let Some(mtime) = entry.mtime { - let kind = if entry.is_dir { - EntryKind::Dir - } else { - let mut char_bag = root_char_bag.clone(); - char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase())); - EntryKind::File(char_bag) - }; - let path: Arc = Arc::from(Path::new(&entry.path)); - entries.push( - Entry { - id: entry.id as usize, - kind, - path: path.clone(), - inode: entry.inode, - mtime: mtime.into(), - is_symlink: entry.is_symlink, - is_ignored: entry.is_ignored, - }, - &(), - ); - paths_by_id.insert_mut(entry.id as usize, path); - } else { - log::warn!("missing mtime in remote worktree entry {:?}", entry.path); + match Entry::try_from((&root_char_bag, entry)) { + Ok(entry) => { + paths_by_id.insert_mut(entry.id as usize, (entry.path.clone(), 0)); + edits.push(Edit::Insert(entry)); + } + Err(err) => log::warn!("error for remote worktree entry {:?}", err), } } + let mut entries = SumTree::new(); + entries.edit(edits, &()); (entries, paths_by_id) }) .await; @@ -374,6 +359,7 @@ impl Deref for Worktree { pub struct LocalWorktree { snapshot: Snapshot, background_snapshot: Arc>, + snapshots_to_send_tx: Option>, scan_state: (watch::Sender, watch::Receiver), _event_stream_handle: fsevent::Handle, poll_scheduled: bool, @@ -413,6 +399,7 @@ impl LocalWorktree { let tree = Self { snapshot, background_snapshot: background_snapshot.clone(), + snapshots_to_send_tx: None, scan_state: watch::channel_with(ScanState::Scanning), _event_stream_handle: event_stream_handle, poll_scheduled: false, @@ -588,6 +575,13 @@ impl LocalWorktree { fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext) { self.scan_state.0.blocking_send(scan_state).ok(); self.poll_snapshot(cx); + if !self.is_scanning() { + if let Some(snapshots_to_send_tx) = self.snapshots_to_send_tx.clone() { + if let Err(err) = smol::block_on(snapshots_to_send_tx.send(self.snapshot())) { + log::error!("error submitting snapshot to send {}", err); + } + } + } } fn poll_snapshot(&mut self, cx: &mut ModelContext) { @@ -772,25 +766,12 @@ impl LocalWorktree { let snapshot = self.snapshot(); let handle = cx.handle(); cx.spawn(|this, mut cx| async move { - let entries = cx - .background() - .spawn(async move { - snapshot - .entries - .cursor::<(), ()>() - .map(|entry| proto::Entry { - id: entry.id as u64, - is_dir: entry.is_dir(), - path: entry.path.to_string_lossy().to_string(), - inode: entry.inode, - mtime: Some(entry.mtime.into()), - is_symlink: entry.is_symlink, - is_ignored: entry.is_ignored, - }) - .collect() - }) - .await; - + let entries = { + let entries = snapshot.entries.clone(); + cx.background() + .spawn(async move { entries.cursor::<(), ()>().map(Into::into).collect() }) + .await + }; let share_response = rpc .request(proto::ShareWorktree { worktree: Some(proto::Worktree { root_name, entries }), @@ -804,9 +785,33 @@ impl LocalWorktree { .insert(share_response.worktree_id, handle.downgrade()); log::info!("sharing worktree {:?}", share_response); + let (snapshots_to_send_tx, snapshots_to_send_rx) = + smol::channel::unbounded::(); + + { + let rpc = rpc.clone(); + let worktree_id = share_response.worktree_id; + std::thread::spawn(move || { + let mut prev_snapshot = snapshot; + while let Ok(snapshot) = smol::block_on(snapshots_to_send_rx.recv()) { + let (updated_entries, removed_entries) = snapshot.diff(&prev_snapshot); + let message = proto::UpdateWorktree { + worktree_id, + updated_entries: updated_entries.iter().map(Into::into).collect(), + removed_entries: removed_entries.iter().map(|id| *id as u64).collect(), + }; + match smol::block_on(rpc.send(message)) { + Ok(()) => prev_snapshot = snapshot, + Err(err) => log::error!("error sending snapshot diff {}", err), + } + } + }); + } this.update(&mut cx, |worktree, _| { - worktree.as_local_mut().unwrap().rpc = Some((rpc, share_response.worktree_id)); + let worktree = worktree.as_local_mut().unwrap(); + worktree.rpc = Some((rpc, share_response.worktree_id)); + worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx); }); Ok((share_response.worktree_id, share_response.access_token)) }) @@ -949,6 +954,98 @@ impl RemoteWorktree { cx.notify(); Ok(()) } + + pub fn update( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + self.update_snapshot(envelope, cx)?; + self.update_open_buffers(cx); + Ok(()) + } + + fn update_snapshot( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + self.snapshot.scan_id += 1; + let scan_id = self.snapshot.scan_id; + + let mut edits = Vec::new(); + for entry_id in envelope.payload.removed_entries { + let entry_id = entry_id as usize; + let entry = self + .snapshot + .entry_for_id(entry_id) + .ok_or_else(|| anyhow!("unknown entry"))?; + edits.push(Edit::Remove(PathKey(entry.path.clone()))); + self.snapshot.paths_by_id.remove_mut(&entry_id); + } + + for entry in envelope.payload.updated_entries { + let entry = Entry::try_from((&self.snapshot.root_char_bag, entry))?; + self.snapshot + .paths_by_id + .insert_mut(entry.id, (entry.path.clone(), scan_id)); + edits.push(Edit::Insert(entry)); + } + self.snapshot.entries.edit(edits, &()); + + cx.notify(); + Ok(()) + } + + fn update_open_buffers(&mut self, cx: &mut ModelContext) { + let mut buffers_to_delete = Vec::new(); + for (buffer_id, buffer) in &self.open_buffers { + if let Some(buffer) = buffer.upgrade(&cx) { + buffer.update(cx, |buffer, cx| { + let buffer_is_clean = !buffer.is_dirty(); + + if let Some(file) = buffer.file_mut() { + let mut file_changed = false; + + if let Some(entry) = file + .entry_id + .and_then(|entry_id| self.snapshot.entry_for_id(entry_id)) + { + if entry.path != file.path { + file.path = entry.path.clone(); + file_changed = true; + } + + if entry.mtime != file.mtime { + file.mtime = entry.mtime; + file_changed = true; + } + } else if let Some(entry) = self.snapshot.entry_for_path(&file.path) { + file.entry_id = Some(entry.id); + file.mtime = entry.mtime; + file_changed = true; + } else if !file.is_deleted() { + if buffer_is_clean { + cx.emit(editor::buffer::Event::Dirtied); + } + file.entry_id = None; + file_changed = true; + } + + if file_changed { + cx.emit(editor::buffer::Event::FileHandleChanged); + } + } + }); + } else { + buffers_to_delete.push(*buffer_id); + } + } + + for buffer_id in buffers_to_delete { + self.open_buffers.remove(&buffer_id); + } + } } #[derive(Clone)] @@ -960,12 +1057,58 @@ pub struct Snapshot { root_char_bag: CharBag, ignores: HashMap, (Arc, usize)>, entries: SumTree, - paths_by_id: rpds::HashTrieMapSync>, + paths_by_id: rpds::RedBlackTreeMapSync, usize)>, removed_entry_ids: HashMap, next_entry_id: Arc, } impl Snapshot { + pub fn diff(&self, other: &Self) -> (Vec, Vec) { + let mut updated_entries = Vec::new(); + let mut removed_entries = Vec::new(); + let mut self_entries = self.paths_by_id.iter().peekable(); + let mut other_entries = other.paths_by_id.iter().peekable(); + loop { + match (self_entries.peek(), other_entries.peek()) { + ( + Some((self_entry_id, (_, self_scan_id))), + Some((other_entry_id, (_, other_scan_id))), + ) => match self_entry_id.cmp(other_entry_id) { + Ordering::Less => { + let entry = self.entry_for_id(**self_entry_id).unwrap().clone(); + updated_entries.push(entry); + self_entries.next(); + } + Ordering::Equal => { + if self_scan_id != other_scan_id { + let entry = self.entry_for_id(**self_entry_id).unwrap().clone(); + updated_entries.push(entry); + } + + self_entries.next(); + other_entries.next(); + } + Ordering::Greater => { + removed_entries.push(**other_entry_id); + other_entries.next(); + } + }, + (Some((self_entry_id, _)), None) => { + let entry = self.entry_for_id(**self_entry_id).unwrap().clone(); + updated_entries.push(entry); + self_entries.next(); + } + (None, Some((other_entry_id, _))) => { + removed_entries.push(**other_entry_id); + other_entries.next(); + } + (None, None) => break, + } + } + + (updated_entries, removed_entries) + } + pub fn file_count(&self) -> usize { self.entries.summary().file_count } @@ -1014,7 +1157,7 @@ impl Snapshot { } fn entry_for_id(&self, id: usize) -> Option<&Entry> { - let path = self.paths_by_id.get(&id)?; + let (path, _) = self.paths_by_id.get(&id)?; self.entry_for_path(path) } @@ -1036,7 +1179,8 @@ impl Snapshot { self.reuse_entry_id(&mut entry); self.entries.insert_or_replace(entry.clone(), &()); - self.paths_by_id.insert_mut(entry.id, entry.path.clone()); + self.paths_by_id + .insert_mut(entry.id, (entry.path.clone(), self.scan_id)); entry } @@ -1065,7 +1209,8 @@ impl Snapshot { for mut entry in entries { self.reuse_entry_id(&mut entry); - self.paths_by_id.insert_mut(entry.id, entry.path.clone()); + self.paths_by_id + .insert_mut(entry.id, (entry.path.clone(), self.scan_id)); edits.push(Edit::Insert(entry)); } self.entries.edit(edits, &()); @@ -2082,6 +2227,51 @@ impl<'a> Iterator for ChildEntriesIter<'a> { } } +impl<'a> From<&'a Entry> for proto::Entry { + fn from(entry: &'a Entry) -> Self { + Self { + id: entry.id as u64, + is_dir: entry.is_dir(), + path: entry.path.to_string_lossy().to_string(), + inode: entry.inode, + mtime: Some(entry.mtime.into()), + is_symlink: entry.is_symlink, + is_ignored: entry.is_ignored, + } + } +} + +impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry { + type Error = anyhow::Error; + + fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result { + if let Some(mtime) = entry.mtime { + let kind = if entry.is_dir { + EntryKind::Dir + } else { + let mut char_bag = root_char_bag.clone(); + char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase())); + EntryKind::File(char_bag) + }; + let path: Arc = Arc::from(Path::new(&entry.path)); + Ok(Entry { + id: entry.id as usize, + kind, + path: path.clone(), + inode: entry.inode, + mtime: mtime.into(), + is_symlink: entry.is_symlink, + is_ignored: entry.is_ignored, + }) + } else { + Err(anyhow!( + "missing mtime in remote worktree entry {:?}", + entry.path + )) + } + } +} + mod remote { use super::*; @@ -2109,6 +2299,30 @@ mod remote { .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx)) } + pub async fn update_worktree( + envelope: TypedEnvelope, + rpc: &rpc::Client, + cx: &mut AsyncAppContext, + ) -> anyhow::Result<()> { + rpc.state + .read() + .await + .shared_worktree(envelope.payload.worktree_id, cx)? + .update(cx, |worktree, cx| { + if let Some(worktree) = worktree.as_remote_mut() { + worktree.update(envelope, cx)?; + } else { + log::error!( + "invalid update message for worktree {}", + envelope.payload.worktree_id + ); + } + Result::<_, anyhow::Error>::Ok(()) + })?; + + Ok(()) + } + pub async fn open_buffer( envelope: TypedEnvelope, rpc: &rpc::Client,