Don't lose operations for buffers that are being opened

Antonio Scandurra created

Change summary

zed/src/worktree.rs | 231 +++++++++++++++++++++++++++-------------------
1 file changed, 135 insertions(+), 96 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -728,14 +728,21 @@ impl Worktree {
 
     #[cfg(feature = "test-support")]
     pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
-        let open_buffers = match self {
-            Worktree::Local(worktree) => &worktree.open_buffers,
-            Worktree::Remote(worktree) => &worktree.open_buffers,
+        let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
+            Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
+            Worktree::Remote(worktree) => {
+                Box::new(worktree.open_buffers.values().filter_map(|buf| {
+                    if let RemoteBuffer::Loaded(buf) = buf {
+                        Some(buf)
+                    } else {
+                        None
+                    }
+                }))
+            }
         };
 
         let path = path.as_ref();
         open_buffers
-            .values()
             .find(|buffer| {
                 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
                     file.path.as_ref() == path
@@ -751,33 +758,43 @@ impl Worktree {
         envelope: proto::UpdateBuffer,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let open_buffers = match self {
-            Worktree::Local(worktree) => &worktree.open_buffers,
-            Worktree::Remote(worktree) => &worktree.open_buffers,
-        };
-        let buffer = open_buffers
-            .get(&(envelope.buffer_id as usize))
-            .and_then(|buf| buf.upgrade(&cx));
-
-        let buffer = if let Some(buffer) = buffer {
-            buffer
-        } else {
-            return if matches!(self, Worktree::Local(_)) {
-                Err(anyhow!(
-                    "invalid buffer {} in update buffer message",
-                    envelope.buffer_id
-                ))
-            } else {
-                Ok(())
-            };
-        };
-
+        let buffer_id = envelope.buffer_id as usize;
         let ops = envelope
             .operations
             .into_iter()
             .map(|op| op.try_into())
             .collect::<anyhow::Result<Vec<_>>>()?;
-        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+
+        match self {
+            Worktree::Local(worktree) => {
+                let buffer = worktree
+                    .open_buffers
+                    .get(&buffer_id)
+                    .and_then(|buf| buf.upgrade(&cx))
+                    .ok_or_else(|| {
+                        anyhow!("invalid buffer {} in update buffer message", buffer_id)
+                    })?;
+                buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+            }
+            Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
+                Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
+                Some(RemoteBuffer::Loaded(buffer)) => {
+                    if let Some(buffer) = buffer.upgrade(&cx) {
+                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                    } else {
+                        worktree
+                            .open_buffers
+                            .insert(buffer_id, RemoteBuffer::Operations(ops));
+                    }
+                }
+                None => {
+                    worktree
+                        .open_buffers
+                        .insert(buffer_id, RemoteBuffer::Operations(ops));
+                }
+            },
+        }
+
         Ok(())
     }
 
@@ -811,8 +828,8 @@ impl Worktree {
         }
     }
 
-    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
-        let update_buffers = match self {
+    fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
+        match self {
             Self::Local(worktree) => {
                 let poll_interval = worktree.poll_interval;
                 worktree.snapshot = worktree.background_snapshot.lock().clone();
@@ -832,93 +849,98 @@ impl Worktree {
                         .detach();
                         worktree.poll_scheduled = true;
                     }
-                    false
                 } else {
-                    true
+                    self.update_open_buffers(cx);
                 }
             }
             Self::Remote(worktree) => {
                 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
-                true
+                self.update_open_buffers(cx);
             }
         };
 
-        if update_buffers {
-            let mut buffers_to_delete = Vec::new();
-            for (buffer_id, buffer) in self.open_buffers() {
-                if let Some(buffer) = buffer.upgrade(&cx) {
-                    buffer.update(cx, |buffer, cx| {
-                        let buffer_is_clean = !buffer.is_dirty();
+        cx.notify();
+    }
 
-                        if let Some(file) = buffer.file_mut() {
-                            let mut file_changed = false;
+    fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
+        let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
+            Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
+            Self::Remote(worktree) => {
+                Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
+                    if let RemoteBuffer::Loaded(buf) = buf {
+                        Some((id, buf))
+                    } else {
+                        None
+                    }
+                }))
+            }
+        };
 
-                            if let Some(entry) = file
-                                .entry_id
-                                .and_then(|entry_id| self.entry_for_id(entry_id))
-                            {
-                                if entry.path != file.path {
-                                    file.path = entry.path.clone();
-                                    file_changed = true;
-                                }
+        let mut buffers_to_delete = Vec::new();
+        for (buffer_id, buffer) in open_buffers {
+            if let Some(buffer) = buffer.upgrade(&cx) {
+                buffer.update(cx, |buffer, cx| {
+                    let buffer_is_clean = !buffer.is_dirty();
 
-                                if entry.mtime != file.mtime {
-                                    file.mtime = entry.mtime;
-                                    file_changed = true;
-                                    if let Some(worktree) = self.as_local() {
-                                        if buffer_is_clean {
-                                            let abs_path = worktree.absolutize(&file.path);
-                                            refresh_buffer(abs_path, &worktree.fs, cx);
-                                        }
-                                    }
-                                }
-                            } else if let Some(entry) = self.entry_for_path(&file.path) {
-                                file.entry_id = Some(entry.id);
+                    if let Some(file) = buffer.file_mut() {
+                        let mut file_changed = false;
+
+                        if let Some(entry) = file
+                            .entry_id
+                            .and_then(|entry_id| self.entry_for_id(entry_id))
+                        {
+                            if entry.path != file.path {
+                                file.path = entry.path.clone();
+                                file_changed = true;
+                            }
+
+                            if entry.mtime != file.mtime {
                                 file.mtime = entry.mtime;
+                                file_changed = true;
                                 if let Some(worktree) = self.as_local() {
                                     if buffer_is_clean {
                                         let abs_path = worktree.absolutize(&file.path);
                                         refresh_buffer(abs_path, &worktree.fs, cx);
                                     }
                                 }
-                                file_changed = true;
-                            } else if !file.is_deleted() {
+                            }
+                        } else if let Some(entry) = self.entry_for_path(&file.path) {
+                            file.entry_id = Some(entry.id);
+                            file.mtime = entry.mtime;
+                            if let Some(worktree) = self.as_local() {
                                 if buffer_is_clean {
-                                    cx.emit(editor::buffer::Event::Dirtied);
+                                    let abs_path = worktree.absolutize(&file.path);
+                                    refresh_buffer(abs_path, &worktree.fs, cx);
                                 }
-                                file.entry_id = None;
-                                file_changed = true;
                             }
-
-                            if file_changed {
-                                cx.emit(editor::buffer::Event::FileHandleChanged);
+                            file_changed = true;
+                        } else if !file.is_deleted() {
+                            if buffer_is_clean {
+                                cx.emit(editor::buffer::Event::Dirtied);
                             }
+                            file.entry_id = None;
+                            file_changed = true;
                         }
-                    });
-                } else {
-                    buffers_to_delete.push(*buffer_id);
-                }
-            }
 
-            for buffer_id in buffers_to_delete {
-                self.open_buffers_mut().remove(&buffer_id);
+                        if file_changed {
+                            cx.emit(editor::buffer::Event::FileHandleChanged);
+                        }
+                    }
+                });
+            } else {
+                buffers_to_delete.push(*buffer_id);
             }
         }
 
-        cx.notify();
-    }
-
-    fn open_buffers(&self) -> &HashMap<usize, WeakModelHandle<Buffer>> {
-        match self {
-            Self::Local(worktree) => &worktree.open_buffers,
-            Self::Remote(worktree) => &worktree.open_buffers,
-        }
-    }
-
-    fn open_buffers_mut(&mut self) -> &mut HashMap<usize, WeakModelHandle<Buffer>> {
-        match self {
-            Self::Local(worktree) => &mut worktree.open_buffers,
-            Self::Remote(worktree) => &mut worktree.open_buffers,
+        for buffer_id in buffers_to_delete {
+            match self {
+                Self::Local(worktree) => {
+                    worktree.open_buffers.remove(&buffer_id);
+                }
+                Self::Remote(worktree) => {
+                    worktree.open_buffers.remove(&buffer_id);
+                }
+            }
         }
     }
 }
@@ -1334,7 +1356,7 @@ pub struct RemoteWorktree {
     rpc: rpc::Client,
     updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
-    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
+    open_buffers: HashMap<usize, RemoteBuffer>,
     peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
 }
@@ -1381,15 +1403,20 @@ impl RemoteWorktree {
                     })
                     .await?;
                 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
-                let buffer_id = remote_buffer.id;
+                let buffer_id = remote_buffer.id as usize;
                 let buffer = cx.add_model(|cx| {
                     Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
                 });
-                this.update(&mut cx, |this, _| {
+                this.update(&mut cx, |this, cx| {
                     let this = this.as_remote_mut().unwrap();
-                    this.open_buffers
-                        .insert(buffer_id as usize, buffer.downgrade());
-                });
+                    if let Some(RemoteBuffer::Operations(pending_ops)) = this
+                        .open_buffers
+                        .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
+                    {
+                        buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
+                    }
+                    Result::<_, anyhow::Error>::Ok(())
+                })?;
                 Ok(buffer)
             }
         })
@@ -1431,6 +1458,20 @@ impl RemoteWorktree {
     }
 }
 
+enum RemoteBuffer {
+    Operations(Vec<Operation>),
+    Loaded(WeakModelHandle<Buffer>),
+}
+
+impl RemoteBuffer {
+    fn upgrade(&self, cx: impl AsRef<AppContext>) -> Option<ModelHandle<Buffer>> {
+        match self {
+            Self::Operations(_) => None,
+            Self::Loaded(buffer) => buffer.upgrade(cx),
+        }
+    }
+}
+
 #[derive(Clone)]
 pub struct Snapshot {
     id: usize,
@@ -2864,8 +2905,6 @@ mod remote {
         rpc: &rpc::Client,
         cx: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
-        eprintln!("got update buffer message {:?}", envelope.payload);
-
         let message = envelope.payload;
         rpc.state
             .read()