Re-request incomplete remote buffers when syncing buffers

Nathan Sobo , Max Brunsfeld , and Mikayla Maki created

Any buffers we requested but that haven't been fully sent will cause
outstainding open requests to hang. If we re-request them, any
waiting open requests will resume when the requested buffers finish
being created.

Co-authored-by: Max Brunsfeld <max@zed.dev>
Co-authored-by: Mikayla Maki <mikayla@zed.dev>

Change summary

crates/project/src/lsp_command.rs |  6 +-
crates/project/src/project.rs     | 91 +++++++++++++++++++++++---------
2 files changed, 68 insertions(+), 29 deletions(-)

Detailed changes

crates/project/src/lsp_command.rs 🔗

@@ -524,7 +524,7 @@ async fn location_links_from_proto(
             Some(origin) => {
                 let buffer = project
                     .update(&mut cx, |this, cx| {
-                        this.wait_for_buffer(origin.buffer_id, cx)
+                        this.wait_for_remote_buffer(origin.buffer_id, cx)
                     })
                     .await?;
                 let start = origin
@@ -549,7 +549,7 @@ async fn location_links_from_proto(
         let target = link.target.ok_or_else(|| anyhow!("missing target"))?;
         let buffer = project
             .update(&mut cx, |this, cx| {
-                this.wait_for_buffer(target.buffer_id, cx)
+                this.wait_for_remote_buffer(target.buffer_id, cx)
             })
             .await?;
         let start = target
@@ -814,7 +814,7 @@ impl LspCommand for GetReferences {
         for location in message.locations {
             let target_buffer = project
                 .update(&mut cx, |this, cx| {
-                    this.wait_for_buffer(location.buffer_id, cx)
+                    this.wait_for_remote_buffer(location.buffer_id, cx)
                 })
                 .await?;
             let start = location

crates/project/src/project.rs 🔗

@@ -107,7 +107,7 @@ pub struct Project {
     opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
     shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
     #[allow(clippy::type_complexity)]
-    loading_buffers: HashMap<
+    loading_buffers_by_path: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
     >,
@@ -115,7 +115,9 @@ pub struct Project {
     loading_local_worktrees:
         HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
     opened_buffers: HashMap<u64, OpenBuffer>,
-    incomplete_buffers: HashMap<u64, ModelHandle<Buffer>>,
+    /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
+    /// Used for re-issuing buffer requests when peers temporarily disconnect
+    incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
     buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
     buffers_being_formatted: HashSet<usize>,
     nonce: u128,
@@ -411,8 +413,8 @@ impl Project {
             collaborators: Default::default(),
             opened_buffers: Default::default(),
             shared_buffers: Default::default(),
-            incomplete_buffers: Default::default(),
-            loading_buffers: Default::default(),
+            incomplete_remote_buffers: Default::default(),
+            loading_buffers_by_path: Default::default(),
             loading_local_worktrees: Default::default(),
             buffer_snapshots: Default::default(),
             client_state: None,
@@ -467,10 +469,10 @@ impl Project {
 
             let mut this = Self {
                 worktrees: Vec::new(),
-                loading_buffers: Default::default(),
+                loading_buffers_by_path: Default::default(),
                 opened_buffer: watch::channel(),
                 shared_buffers: Default::default(),
-                incomplete_buffers: Default::default(),
+                incomplete_remote_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
@@ -1284,7 +1286,7 @@ impl Project {
             return Task::ready(Ok(existing_buffer));
         }
 
-        let mut loading_watch = match self.loading_buffers.entry(project_path.clone()) {
+        let mut loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
             // If the given path is already being loaded, then wait for that existing
             // task to complete and return the same buffer.
             hash_map::Entry::Occupied(e) => e.get().clone(),
@@ -1304,7 +1306,7 @@ impl Project {
                     let load_result = load_buffer.await;
                     *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
                         // Record the fact that the buffer is no longer loading.
-                        this.loading_buffers.remove(&project_path);
+                        this.loading_buffers_by_path.remove(&project_path);
                         let buffer = load_result.map_err(Arc::new)?;
                         Ok(buffer)
                     }));
@@ -1364,7 +1366,7 @@ impl Project {
                 })
                 .await?;
             this.update(&mut cx, |this, cx| {
-                this.wait_for_buffer(response.buffer_id, cx)
+                this.wait_for_remote_buffer(response.buffer_id, cx)
             })
             .await
         })
@@ -1425,8 +1427,10 @@ impl Project {
                 .request(proto::OpenBufferById { project_id, id });
             cx.spawn(|this, mut cx| async move {
                 let buffer_id = request.await?.buffer_id;
-                this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
-                    .await
+                this.update(&mut cx, |this, cx| {
+                    this.wait_for_remote_buffer(buffer_id, cx)
+                })
+                .await
             })
         } else {
             Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
@@ -3268,7 +3272,7 @@ impl Project {
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
                 this.update(&mut cx, |this, cx| {
-                    this.wait_for_buffer(response.buffer_id, cx)
+                    this.wait_for_remote_buffer(response.buffer_id, cx)
                 })
                 .await
             })
@@ -4124,7 +4128,7 @@ impl Project {
                 for location in response.locations {
                     let target_buffer = this
                         .update(&mut cx, |this, cx| {
-                            this.wait_for_buffer(location.buffer_id, cx)
+                            this.wait_for_remote_buffer(location.buffer_id, cx)
                         })
                         .await?;
                     let start = location
@@ -5005,19 +5009,21 @@ impl Project {
                     let buffer = cx.add_model(|_| {
                         Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
                     });
-                    this.incomplete_buffers.insert(buffer_id, buffer);
+                    this.incomplete_remote_buffers
+                        .insert(buffer_id, Some(buffer));
                 }
                 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
                     let buffer = this
-                        .incomplete_buffers
+                        .incomplete_remote_buffers
                         .get(&chunk.buffer_id)
+                        .cloned()
+                        .flatten()
                         .ok_or_else(|| {
                             anyhow!(
                                 "received chunk for buffer {} without initial state",
                                 chunk.buffer_id
                             )
-                        })?
-                        .clone();
+                        })?;
                     let operations = chunk
                         .operations
                         .into_iter()
@@ -5026,7 +5032,7 @@ impl Project {
                     buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 
                     if chunk.is_last {
-                        this.incomplete_buffers.remove(&chunk.buffer_id);
+                        this.incomplete_remote_buffers.remove(&chunk.buffer_id);
                         this.register_buffer(&buffer, cx)?;
                     }
                 }
@@ -5049,7 +5055,12 @@ impl Project {
                 .opened_buffers
                 .get_mut(&buffer_id)
                 .and_then(|b| b.upgrade(cx))
-                .or_else(|| this.incomplete_buffers.get(&buffer_id).cloned())
+                .or_else(|| {
+                    this.incomplete_remote_buffers
+                        .get(&buffer_id)
+                        .cloned()
+                        .flatten()
+                })
             {
                 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
             }
@@ -5070,7 +5081,12 @@ impl Project {
                 .opened_buffers
                 .get_mut(&buffer_id)
                 .and_then(|b| b.upgrade(cx))
-                .or_else(|| this.incomplete_buffers.get(&buffer_id).cloned())
+                .or_else(|| {
+                    this.incomplete_remote_buffers
+                        .get(&buffer_id)
+                        .cloned()
+                        .flatten()
+                })
             {
                 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
                 let worktree = this
@@ -5610,7 +5626,9 @@ impl Project {
             for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
             {
                 let buffer = this
-                    .update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
+                    .update(&mut cx, |this, cx| {
+                        this.wait_for_remote_buffer(buffer_id, cx)
+                    })
                     .await?;
                 let transaction = language::proto::deserialize_transaction(transaction)?;
                 project_transaction.0.insert(buffer, transaction);
@@ -5686,12 +5704,13 @@ impl Project {
         buffer_id
     }
 
-    fn wait_for_buffer(
-        &self,
+    fn wait_for_remote_buffer(
+        &mut self,
         id: u64,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ModelHandle<Buffer>>> {
         let mut opened_buffer_rx = self.opened_buffer.1.clone();
+
         cx.spawn(|this, mut cx| async move {
             let buffer = loop {
                 let buffer = this.read_with(&cx, |this, cx| {
@@ -5705,6 +5724,9 @@ impl Project {
                     return Err(anyhow!("disconnected before buffer {} could be opened", id));
                 }
 
+                this.update(&mut cx, |this, _| {
+                    this.incomplete_remote_buffers.entry(id).or_default();
+                });
                 opened_buffer_rx
                     .next()
                     .await
@@ -5739,8 +5761,9 @@ impl Project {
 
         let client = self.client.clone();
         cx.spawn(|this, cx| async move {
-            let buffers = this.read_with(&cx, |this, cx| {
-                this.opened_buffers
+            let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| {
+                let buffers = this
+                    .opened_buffers
                     .iter()
                     .filter_map(|(id, buffer)| {
                         let buffer = buffer.upgrade(cx)?;
@@ -5749,7 +5772,14 @@ impl Project {
                             version: language::proto::serialize_version(&buffer.read(cx).version),
                         })
                     })
-                    .collect()
+                    .collect();
+                let incomplete_buffer_ids = this
+                    .incomplete_remote_buffers
+                    .keys()
+                    .copied()
+                    .collect::<Vec<_>>();
+
+                (buffers, incomplete_buffer_ids)
             });
             let response = client
                 .request(proto::SynchronizeBuffers {
@@ -5783,6 +5813,15 @@ impl Project {
                     }
                 })
             });
+
+            // Any incomplete buffers have open requests waiting. Request that the host sends
+            // creates these buffers for us again to unblock any waiting futures.
+            for id in incomplete_buffer_ids {
+                cx.background()
+                    .spawn(client.request(proto::OpenBufferById { project_id, id }))
+                    .detach();
+            }
+
             futures::future::join_all(send_updates_for_buffers)
                 .await
                 .into_iter()