Avoid storing operations when no buffers are being loaded

Max Brunsfeld and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/client/src/client.rs   |  4 ++
crates/project/src/project.rs | 54 ++++++++++++++++++++++++++++--------
crates/server/src/rpc.rs      | 28 +++++++++++++++----
3 files changed, 67 insertions(+), 19 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -220,6 +220,10 @@ impl Client {
         })
     }
 
+    pub fn id(&self) -> usize {
+        self.id
+    }
+
     #[cfg(any(test, feature = "test-support"))]
     pub fn override_authenticate<F>(&mut self, authenticate: F) -> &mut Self
     where

crates/project/src/project.rs 🔗

@@ -57,7 +57,7 @@ pub struct Project {
 
 enum OpenBuffer {
     Loaded(WeakModelHandle<Buffer>),
-    Operations(Vec<Operation>),
+    Loading(Vec<Operation>),
 }
 
 enum WorktreeHandle {
@@ -346,7 +346,22 @@ impl Project {
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn shared_buffer(&self, peer_id: PeerId, remote_id: u64) -> Option<ModelHandle<Buffer>> {
-        Some(self.shared_buffers.get(&peer_id)?.get(&remote_id)?.clone())
+        let result = self
+            .shared_buffers
+            .get(&peer_id)
+            .and_then(|buffers| buffers.get(&remote_id))
+            .cloned();
+        if result.is_none() {
+            dbg!(&self.shared_buffers);
+        }
+        result
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn has_buffered_operations(&self) -> bool {
+        self.open_buffers
+            .values()
+            .any(|buffer| matches!(buffer, OpenBuffer::Loading(_)))
     }
 
     pub fn fs(&self) -> &Arc<dyn Fs> {
@@ -512,6 +527,10 @@ impl Project {
         }
     }
 
+    pub fn is_remote(&self) -> bool {
+        !self.is_local()
+    }
+
     pub fn open_buffer(
         &mut self,
         path: impl Into<ProjectPath>,
@@ -551,6 +570,11 @@ impl Project {
                     *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
                         // Record the fact that the buffer is no longer loading.
                         this.loading_buffers.remove(&project_path);
+                        if this.loading_buffers.is_empty() {
+                            this.open_buffers
+                                .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
+                        }
+
                         let buffer = load_result.map_err(Arc::new)?;
                         Ok(buffer)
                     }));
@@ -734,7 +758,7 @@ impl Project {
             OpenBuffer::Loaded(buffer.downgrade()),
         ) {
             None => {}
-            Some(OpenBuffer::Operations(operations)) => {
+            Some(OpenBuffer::Loading(operations)) => {
                 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
             }
             Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?,
@@ -2200,17 +2224,21 @@ impl Project {
                 .into_iter()
                 .map(|op| language::proto::deserialize_operation(op))
                 .collect::<Result<Vec<_>, _>>()?;
-            let buffer = this
-                .open_buffers
-                .entry(buffer_id)
-                .or_insert_with(|| OpenBuffer::Operations(Vec::new()));
-            match buffer {
-                OpenBuffer::Loaded(buffer) => {
-                    if let Some(buffer) = buffer.upgrade(cx) {
-                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+            let is_remote = this.is_remote();
+            match this.open_buffers.entry(buffer_id) {
+                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
+                    OpenBuffer::Loaded(buffer) => {
+                        if let Some(buffer) = buffer.upgrade(cx) {
+                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                        }
+                    }
+                    OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
+                },
+                hash_map::Entry::Vacant(e) => {
+                    if is_remote && this.loading_buffers.len() > 0 {
+                        e.insert(OpenBuffer::Loading(ops));
                     }
                 }
-                OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
             }
             Ok(())
         })
@@ -2778,7 +2806,7 @@ impl OpenBuffer {
     pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
         match self {
             OpenBuffer::Loaded(handle) => handle.upgrade(cx),
-            OpenBuffer::Operations(_) => None,
+            OpenBuffer::Loading(_) => None,
         }
     }
 }

crates/server/src/rpc.rs 🔗

@@ -3711,7 +3711,8 @@ mod tests {
                 .collect::<BTreeMap<_, _>>()
         });
 
-        for (guest_ix, (guest_client, guest_cx)) in clients.iter().enumerate() {
+        for (guest_client, guest_cx) in clients.iter().skip(1) {
+            let guest_id = guest_client.client.id();
             let worktree_snapshots =
                 guest_client
                     .project
@@ -3731,7 +3732,7 @@ mod tests {
                 worktree_snapshots.keys().collect::<Vec<_>>(),
                 host_worktree_snapshots.keys().collect::<Vec<_>>(),
                 "guest {} has different worktrees than the host",
-                guest_ix
+                guest_id
             );
             for (id, host_snapshot) in &host_worktree_snapshots {
                 let guest_snapshot = &worktree_snapshots[id];
@@ -3739,30 +3740,45 @@ mod tests {
                     guest_snapshot.root_name(),
                     host_snapshot.root_name(),
                     "guest {} has different root name than the host for worktree {}",
-                    guest_ix,
+                    guest_id,
                     id
                 );
                 assert_eq!(
                     guest_snapshot.entries(false).collect::<Vec<_>>(),
                     host_snapshot.entries(false).collect::<Vec<_>>(),
                     "guest {} has different snapshot than the host for worktree {}",
-                    guest_ix,
+                    guest_id,
                     id
                 );
             }
 
+            guest_client
+                .project
+                .as_ref()
+                .unwrap()
+                .read_with(guest_cx, |project, _| {
+                    assert!(
+                        !project.has_buffered_operations(),
+                        "guest {} has buffered operations ",
+                        guest_id,
+                    );
+                });
+
             for guest_buffer in &guest_client.buffers {
                 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
                 let host_buffer = host_project.read_with(&host_cx, |project, _| {
                     project
                         .shared_buffer(guest_client.peer_id, buffer_id)
-                        .unwrap()
+                        .expect(&format!(
+                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
+                            guest_id, guest_client.peer_id, buffer_id
+                        ))
                 });
                 assert_eq!(
                     guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
                     host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
                     "guest {} buffer {} differs from the host's buffer",
-                    guest_ix,
+                    guest_id,
                     buffer_id,
                 );
             }