Propagate file system changes to remote worktrees

Antonio Scandurra created

Change summary

zed-rpc/proto/zed.proto |  25 ++-
zed-rpc/src/proto.rs    |   1 
zed/src/worktree.rs     | 318 +++++++++++++++++++++++++++++++++++-------
3 files changed, 283 insertions(+), 61 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -12,15 +12,16 @@ message Envelope {
         ShareWorktreeResponse share_worktree_response = 7;
         OpenWorktree open_worktree = 8;
         OpenWorktreeResponse open_worktree_response = 9;
-        CloseWorktree close_worktree = 10;
-        OpenBuffer open_buffer = 11;
-        OpenBufferResponse open_buffer_response = 12;
-        CloseBuffer close_buffer = 13;
-        UpdateBuffer update_buffer = 14;
-        SaveBuffer save_buffer = 15;
-        BufferSaved buffer_saved = 16;
-        AddPeer add_peer = 17;
-        RemovePeer remove_peer = 18;
+        UpdateWorktree update_worktree = 10;
+        CloseWorktree close_worktree = 11;
+        OpenBuffer open_buffer = 12;
+        OpenBufferResponse open_buffer_response = 13;
+        CloseBuffer close_buffer = 14;
+        UpdateBuffer update_buffer = 15;
+        SaveBuffer save_buffer = 16;
+        BufferSaved buffer_saved = 17;
+        AddPeer add_peer = 18;
+        RemovePeer remove_peer = 19;
     }
 }
 
@@ -53,6 +54,12 @@ message OpenWorktreeResponse {
     repeated Peer peers = 3;
 }
 
+message UpdateWorktree {
+    uint64 worktree_id = 1;
+    repeated Entry updated_entries = 2;
+    repeated uint64 removed_entries = 3;
+}
+
 message CloseWorktree {
     uint64 worktree_id = 1;
 }

zed-rpc/src/proto.rs 🔗

@@ -71,6 +71,7 @@ macro_rules! request_message {
 request_message!(Auth, AuthResponse);
 request_message!(ShareWorktree, ShareWorktreeResponse);
 request_message!(OpenWorktree, OpenWorktreeResponse);
+message!(UpdateWorktree);
 message!(CloseWorktree);
 request_message!(OpenBuffer, OpenBufferResponse);
 message!(CloseBuffer);

zed/src/worktree.rs 🔗

@@ -30,9 +30,9 @@ use smol::{
     io::{AsyncReadExt, AsyncWriteExt},
 };
 use std::{
-    cmp,
+    cmp::{self, Ordering},
     collections::HashMap,
-    convert::TryInto,
+    convert::{TryFrom, TryInto},
     ffi::{OsStr, OsString},
     fmt, fs,
     future::Future,
@@ -55,6 +55,7 @@ lazy_static! {
 pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     rpc.on_message(remote::add_peer, cx);
     rpc.on_message(remote::remove_peer, cx);
+    rpc.on_message(remote::update_worktree, cx);
     rpc.on_message(remote::open_buffer, cx);
     rpc.on_message(remote::close_buffer, cx);
     rpc.on_message(remote::update_buffer, cx);
@@ -135,35 +136,19 @@ impl Worktree {
         let (entries, paths_by_id) = cx
             .background()
             .spawn(async move {
-                let mut entries = SumTree::new();
-                let mut paths_by_id = rpds::HashTrieMapSync::default();
+                let mut paths_by_id = rpds::RedBlackTreeMapSync::default();
+                let mut edits = Vec::new();
                 for entry in worktree_message.entries {
-                    if let Some(mtime) = entry.mtime {
-                        let kind = if entry.is_dir {
-                            EntryKind::Dir
-                        } else {
-                            let mut char_bag = root_char_bag.clone();
-                            char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
-                            EntryKind::File(char_bag)
-                        };
-                        let path: Arc<Path> = Arc::from(Path::new(&entry.path));
-                        entries.push(
-                            Entry {
-                                id: entry.id as usize,
-                                kind,
-                                path: path.clone(),
-                                inode: entry.inode,
-                                mtime: mtime.into(),
-                                is_symlink: entry.is_symlink,
-                                is_ignored: entry.is_ignored,
-                            },
-                            &(),
-                        );
-                        paths_by_id.insert_mut(entry.id as usize, path);
-                    } else {
-                        log::warn!("missing mtime in remote worktree entry {:?}", entry.path);
+                    match Entry::try_from((&root_char_bag, entry)) {
+                        Ok(entry) => {
+                            paths_by_id.insert_mut(entry.id as usize, (entry.path.clone(), 0));
+                            edits.push(Edit::Insert(entry));
+                        }
+                        Err(err) => log::warn!("error for remote worktree entry {:?}", err),
                     }
                 }
+                let mut entries = SumTree::new();
+                entries.edit(edits, &());
                 (entries, paths_by_id)
             })
             .await;
@@ -374,6 +359,7 @@ impl Deref for Worktree {
 pub struct LocalWorktree {
     snapshot: Snapshot,
     background_snapshot: Arc<Mutex<Snapshot>>,
+    snapshots_to_send_tx: Option<Sender<Snapshot>>,
     scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
     _event_stream_handle: fsevent::Handle,
     poll_scheduled: bool,
@@ -413,6 +399,7 @@ impl LocalWorktree {
         let tree = Self {
             snapshot,
             background_snapshot: background_snapshot.clone(),
+            snapshots_to_send_tx: None,
             scan_state: watch::channel_with(ScanState::Scanning),
             _event_stream_handle: event_stream_handle,
             poll_scheduled: false,
@@ -588,6 +575,13 @@ impl LocalWorktree {
     fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
         self.scan_state.0.blocking_send(scan_state).ok();
         self.poll_snapshot(cx);
+        if !self.is_scanning() {
+            if let Some(snapshots_to_send_tx) = self.snapshots_to_send_tx.clone() {
+                if let Err(err) = smol::block_on(snapshots_to_send_tx.send(self.snapshot())) {
+                    log::error!("error submitting snapshot to send {}", err);
+                }
+            }
+        }
     }
 
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
@@ -772,25 +766,12 @@ impl LocalWorktree {
         let snapshot = self.snapshot();
         let handle = cx.handle();
         cx.spawn(|this, mut cx| async move {
-            let entries = cx
-                .background()
-                .spawn(async move {
-                    snapshot
-                        .entries
-                        .cursor::<(), ()>()
-                        .map(|entry| proto::Entry {
-                            id: entry.id as u64,
-                            is_dir: entry.is_dir(),
-                            path: entry.path.to_string_lossy().to_string(),
-                            inode: entry.inode,
-                            mtime: Some(entry.mtime.into()),
-                            is_symlink: entry.is_symlink,
-                            is_ignored: entry.is_ignored,
-                        })
-                        .collect()
-                })
-                .await;
-
+            let entries = {
+                let entries = snapshot.entries.clone();
+                cx.background()
+                    .spawn(async move { entries.cursor::<(), ()>().map(Into::into).collect() })
+                    .await
+            };
             let share_response = rpc
                 .request(proto::ShareWorktree {
                     worktree: Some(proto::Worktree { root_name, entries }),
@@ -804,9 +785,33 @@ impl LocalWorktree {
                 .insert(share_response.worktree_id, handle.downgrade());
 
             log::info!("sharing worktree {:?}", share_response);
+            let (snapshots_to_send_tx, snapshots_to_send_rx) =
+                smol::channel::unbounded::<Snapshot>();
+
+            {
+                let rpc = rpc.clone();
+                let worktree_id = share_response.worktree_id;
+                std::thread::spawn(move || {
+                    let mut prev_snapshot = snapshot;
+                    while let Ok(snapshot) = smol::block_on(snapshots_to_send_rx.recv()) {
+                        let (updated_entries, removed_entries) = snapshot.diff(&prev_snapshot);
+                        let message = proto::UpdateWorktree {
+                            worktree_id,
+                            updated_entries: updated_entries.iter().map(Into::into).collect(),
+                            removed_entries: removed_entries.iter().map(|id| *id as u64).collect(),
+                        };
+                        match smol::block_on(rpc.send(message)) {
+                            Ok(()) => prev_snapshot = snapshot,
+                            Err(err) => log::error!("error sending snapshot diff {}", err),
+                        }
+                    }
+                });
+            }
 
             this.update(&mut cx, |worktree, _| {
-                worktree.as_local_mut().unwrap().rpc = Some((rpc, share_response.worktree_id));
+                let worktree = worktree.as_local_mut().unwrap();
+                worktree.rpc = Some((rpc, share_response.worktree_id));
+                worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
             });
             Ok((share_response.worktree_id, share_response.access_token))
         })
@@ -949,6 +954,98 @@ impl RemoteWorktree {
         cx.notify();
         Ok(())
     }
+
+    pub fn update(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateWorktree>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        self.update_snapshot(envelope, cx)?;
+        self.update_open_buffers(cx);
+        Ok(())
+    }
+
+    fn update_snapshot(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateWorktree>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<()> {
+        self.snapshot.scan_id += 1;
+        let scan_id = self.snapshot.scan_id;
+
+        let mut edits = Vec::new();
+        for entry_id in envelope.payload.removed_entries {
+            let entry_id = entry_id as usize;
+            let entry = self
+                .snapshot
+                .entry_for_id(entry_id)
+                .ok_or_else(|| anyhow!("unknown entry"))?;
+            edits.push(Edit::Remove(PathKey(entry.path.clone())));
+            self.snapshot.paths_by_id.remove_mut(&entry_id);
+        }
+
+        for entry in envelope.payload.updated_entries {
+            let entry = Entry::try_from((&self.snapshot.root_char_bag, entry))?;
+            self.snapshot
+                .paths_by_id
+                .insert_mut(entry.id, (entry.path.clone(), scan_id));
+            edits.push(Edit::Insert(entry));
+        }
+        self.snapshot.entries.edit(edits, &());
+
+        cx.notify();
+        Ok(())
+    }
+
+    fn update_open_buffers(&mut self, cx: &mut ModelContext<Worktree>) {
+        let mut buffers_to_delete = Vec::new();
+        for (buffer_id, buffer) in &self.open_buffers {
+            if let Some(buffer) = buffer.upgrade(&cx) {
+                buffer.update(cx, |buffer, cx| {
+                    let buffer_is_clean = !buffer.is_dirty();
+
+                    if let Some(file) = buffer.file_mut() {
+                        let mut file_changed = false;
+
+                        if let Some(entry) = file
+                            .entry_id
+                            .and_then(|entry_id| self.snapshot.entry_for_id(entry_id))
+                        {
+                            if entry.path != file.path {
+                                file.path = entry.path.clone();
+                                file_changed = true;
+                            }
+
+                            if entry.mtime != file.mtime {
+                                file.mtime = entry.mtime;
+                                file_changed = true;
+                            }
+                        } else if let Some(entry) = self.snapshot.entry_for_path(&file.path) {
+                            file.entry_id = Some(entry.id);
+                            file.mtime = entry.mtime;
+                            file_changed = true;
+                        } else if !file.is_deleted() {
+                            if buffer_is_clean {
+                                cx.emit(editor::buffer::Event::Dirtied);
+                            }
+                            file.entry_id = None;
+                            file_changed = true;
+                        }
+
+                        if file_changed {
+                            cx.emit(editor::buffer::Event::FileHandleChanged);
+                        }
+                    }
+                });
+            } else {
+                buffers_to_delete.push(*buffer_id);
+            }
+        }
+
+        for buffer_id in buffers_to_delete {
+            self.open_buffers.remove(&buffer_id);
+        }
+    }
 }
 
 #[derive(Clone)]
@@ -960,12 +1057,58 @@ pub struct Snapshot {
     root_char_bag: CharBag,
     ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
     entries: SumTree<Entry>,
-    paths_by_id: rpds::HashTrieMapSync<usize, Arc<Path>>,
+    paths_by_id: rpds::RedBlackTreeMapSync<usize, (Arc<Path>, usize)>,
     removed_entry_ids: HashMap<u64, usize>,
     next_entry_id: Arc<AtomicUsize>,
 }
 
 impl Snapshot {
+    pub fn diff(&self, other: &Self) -> (Vec<Entry>, Vec<usize>) {
+        let mut updated_entries = Vec::new();
+        let mut removed_entries = Vec::new();
+        let mut self_entries = self.paths_by_id.iter().peekable();
+        let mut other_entries = other.paths_by_id.iter().peekable();
+        loop {
+            match (self_entries.peek(), other_entries.peek()) {
+                (
+                    Some((self_entry_id, (_, self_scan_id))),
+                    Some((other_entry_id, (_, other_scan_id))),
+                ) => match self_entry_id.cmp(other_entry_id) {
+                    Ordering::Less => {
+                        let entry = self.entry_for_id(**self_entry_id).unwrap().clone();
+                        updated_entries.push(entry);
+                        self_entries.next();
+                    }
+                    Ordering::Equal => {
+                        if self_scan_id != other_scan_id {
+                            let entry = self.entry_for_id(**self_entry_id).unwrap().clone();
+                            updated_entries.push(entry);
+                        }
+
+                        self_entries.next();
+                        other_entries.next();
+                    }
+                    Ordering::Greater => {
+                        removed_entries.push(**other_entry_id);
+                        other_entries.next();
+                    }
+                },
+                (Some((self_entry_id, _)), None) => {
+                    let entry = self.entry_for_id(**self_entry_id).unwrap().clone();
+                    updated_entries.push(entry);
+                    self_entries.next();
+                }
+                (None, Some((other_entry_id, _))) => {
+                    removed_entries.push(**other_entry_id);
+                    other_entries.next();
+                }
+                (None, None) => break,
+            }
+        }
+
+        (updated_entries, removed_entries)
+    }
+
     pub fn file_count(&self) -> usize {
         self.entries.summary().file_count
     }
@@ -1014,7 +1157,7 @@ impl Snapshot {
     }
 
     fn entry_for_id(&self, id: usize) -> Option<&Entry> {
-        let path = self.paths_by_id.get(&id)?;
+        let (path, _) = self.paths_by_id.get(&id)?;
         self.entry_for_path(path)
     }
 
@@ -1036,7 +1179,8 @@ impl Snapshot {
 
         self.reuse_entry_id(&mut entry);
         self.entries.insert_or_replace(entry.clone(), &());
-        self.paths_by_id.insert_mut(entry.id, entry.path.clone());
+        self.paths_by_id
+            .insert_mut(entry.id, (entry.path.clone(), self.scan_id));
         entry
     }
 
@@ -1065,7 +1209,8 @@ impl Snapshot {
 
         for mut entry in entries {
             self.reuse_entry_id(&mut entry);
-            self.paths_by_id.insert_mut(entry.id, entry.path.clone());
+            self.paths_by_id
+                .insert_mut(entry.id, (entry.path.clone(), self.scan_id));
             edits.push(Edit::Insert(entry));
         }
         self.entries.edit(edits, &());
@@ -2082,6 +2227,51 @@ impl<'a> Iterator for ChildEntriesIter<'a> {
     }
 }
 
+impl<'a> From<&'a Entry> for proto::Entry {
+    fn from(entry: &'a Entry) -> Self {
+        Self {
+            id: entry.id as u64,
+            is_dir: entry.is_dir(),
+            path: entry.path.to_string_lossy().to_string(),
+            inode: entry.inode,
+            mtime: Some(entry.mtime.into()),
+            is_symlink: entry.is_symlink,
+            is_ignored: entry.is_ignored,
+        }
+    }
+}
+
+impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
+    type Error = anyhow::Error;
+
+    fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
+        if let Some(mtime) = entry.mtime {
+            let kind = if entry.is_dir {
+                EntryKind::Dir
+            } else {
+                let mut char_bag = root_char_bag.clone();
+                char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
+                EntryKind::File(char_bag)
+            };
+            let path: Arc<Path> = Arc::from(Path::new(&entry.path));
+            Ok(Entry {
+                id: entry.id as usize,
+                kind,
+                path: path.clone(),
+                inode: entry.inode,
+                mtime: mtime.into(),
+                is_symlink: entry.is_symlink,
+                is_ignored: entry.is_ignored,
+            })
+        } else {
+            Err(anyhow!(
+                "missing mtime in remote worktree entry {:?}",
+                entry.path
+            ))
+        }
+    }
+}
+
 mod remote {
     use super::*;
 
@@ -2109,6 +2299,30 @@ mod remote {
             .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
     }
 
+    pub async fn update_worktree(
+        envelope: TypedEnvelope<proto::UpdateWorktree>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        rpc.state
+            .read()
+            .await
+            .shared_worktree(envelope.payload.worktree_id, cx)?
+            .update(cx, |worktree, cx| {
+                if let Some(worktree) = worktree.as_remote_mut() {
+                    worktree.update(envelope, cx)?;
+                } else {
+                    log::error!(
+                        "invalid update message for worktree {}",
+                        envelope.payload.worktree_id
+                    );
+                }
+                Result::<_, anyhow::Error>::Ok(())
+            })?;
+
+        Ok(())
+    }
+
     pub async fn open_buffer(
         envelope: TypedEnvelope<proto::OpenBuffer>,
         rpc: &rpc::Client,