Fix error handling in buffer open (#9667)

Conrad Irwin , Max , and Marshall created

Co-Authored-By: Max <max@zed.dev>
Co-Authored-By: Marshall <marshall@zed.dev>

Release Notes:

- N/A

Co-authored-by: Max <max@zed.dev>
Co-authored-by: Marshall <marshall@zed.dev>

Change summary

crates/project/src/project.rs | 179 +++++++++++++++++-------------------
1 file changed, 87 insertions(+), 92 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -21,7 +21,10 @@ use copilot::Copilot;
 use debounced_delay::DebouncedDelay;
 use fs::repository::GitRepository;
 use futures::{
-    channel::mpsc::{self, UnboundedReceiver},
+    channel::{
+        mpsc::{self, UnboundedReceiver},
+        oneshot,
+    },
     future::{try_join_all, Shared},
     select,
     stream::FuturesUnordered,
@@ -93,7 +96,7 @@ use text::{Anchor, BufferId};
 use util::{
     debug_panic, defer,
     http::HttpClient,
-    merge_json_value_into,
+    maybe, merge_json_value_into,
     paths::{
         LOCAL_SETTINGS_RELATIVE_PATH, LOCAL_TASKS_RELATIVE_PATH, LOCAL_VSCODE_TASKS_RELATIVE_PATH,
     },
@@ -131,6 +134,13 @@ pub trait Item {
     fn project_path(&self, cx: &AppContext) -> Option<ProjectPath>;
 }
 
+#[derive(Clone)]
+pub enum OpenedBufferEvent {
+    Disconnected,
+    Ok(BufferId),
+    Err(BufferId, Arc<anyhow::Error>),
+}
+
 pub struct Project {
     worktrees: Vec<WorktreeHandle>,
     active_entry: Option<ProjectEntryId>,
@@ -158,7 +168,8 @@ pub struct Project {
     client_subscriptions: Vec<client::Subscription>,
     _subscriptions: Vec<gpui::Subscription>,
     next_buffer_id: BufferId,
-    opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
+    loading_buffers: HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
+    incomplete_remote_buffers: HashMap<BufferId, Model<Buffer>>,
     shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
     #[allow(clippy::type_complexity)]
     loading_buffers_by_path: HashMap<
@@ -171,9 +182,6 @@ pub struct Project {
     opened_buffers: HashMap<BufferId, OpenBuffer>,
     local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
     local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
-    /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
-    /// Used for re-issuing buffer requests when peers temporarily disconnect
-    incomplete_remote_buffers: HashMap<BufferId, Option<Model<Buffer>>>,
     buffer_snapshots: HashMap<BufferId, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
     buffers_being_formatted: HashSet<BufferId>,
     buffers_needing_diff: HashSet<WeakModel<Buffer>>,
@@ -607,7 +615,6 @@ impl Project {
                 next_buffer_id: BufferId::new(1).unwrap(),
                 opened_buffers: Default::default(),
                 shared_buffers: Default::default(),
-                incomplete_remote_buffers: Default::default(),
                 loading_buffers_by_path: Default::default(),
                 loading_local_worktrees: Default::default(),
                 local_buffer_ids_by_path: Default::default(),
@@ -615,7 +622,8 @@ impl Project {
                 buffer_snapshots: Default::default(),
                 join_project_response_message_id: 0,
                 client_state: ProjectClientState::Local,
-                opened_buffer: watch::channel(),
+                loading_buffers: HashMap::default(),
+                incomplete_remote_buffers: HashMap::default(),
                 client_subscriptions: Vec::new(),
                 _subscriptions: vec![
                     cx.observe_global::<SettingsStore>(Self::on_settings_changed),
@@ -721,7 +729,7 @@ impl Project {
                 flush_language_server_update: None,
                 loading_buffers_by_path: Default::default(),
                 next_buffer_id: BufferId::new(1).unwrap(),
-                opened_buffer: watch::channel(),
+                loading_buffers: Default::default(),
                 shared_buffers: Default::default(),
                 incomplete_remote_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
@@ -1711,7 +1719,8 @@ impl Project {
 
             // Wake up all futures currently waiting on a buffer to get opened,
             // to give them a chance to fail now that we've disconnected.
-            *self.opened_buffer.0.borrow_mut() = ();
+            self.loading_buffers.clear();
+            // self.opened_buffer.send(OpenedBufferEvent::Disconnected);
         }
     }
 
@@ -2144,7 +2153,11 @@ impl Project {
         })
         .detach();
 
-        *self.opened_buffer.0.borrow_mut() = ();
+        if let Some(senders) = self.loading_buffers.remove(&remote_id) {
+            for sender in senders {
+                sender.send(Ok(buffer.clone())).ok();
+            }
+        }
         Ok(())
     }
 
@@ -7777,26 +7790,36 @@ impl Project {
                 .ok_or_else(|| anyhow!("missing variant"))?
             {
                 proto::create_buffer_for_peer::Variant::State(mut state) => {
-                    let mut buffer_file = None;
-                    if let Some(file) = state.file.take() {
-                        let worktree_id = WorktreeId::from_proto(file.worktree_id);
-                        let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
-                            anyhow!("no worktree found for id {}", file.worktree_id)
-                        })?;
-                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
-                            as Arc<dyn language::File>);
-                    }
-
                     let buffer_id = BufferId::new(state.id)?;
-                    let buffer = Buffer::from_proto(
-                        this.replica_id(),
-                        this.capability(),
-                        state,
-                        buffer_file,
-                    )?;
-                    let buffer = cx.new_model(|_| buffer);
-                    this.incomplete_remote_buffers
-                        .insert(buffer_id, Some(buffer));
+
+                    let buffer_result = maybe!({
+                        let mut buffer_file = None;
+                        if let Some(file) = state.file.take() {
+                            let worktree_id = WorktreeId::from_proto(file.worktree_id);
+                            let worktree =
+                                this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
+                                    anyhow!("no worktree found for id {}", file.worktree_id)
+                                })?;
+                            buffer_file =
+                                Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
+                                    as Arc<dyn language::File>);
+                        }
+                        Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file)
+                    });
+
+                    match buffer_result {
+                        Ok(buffer) => {
+                            let buffer = cx.new_model(|_| buffer);
+                            this.incomplete_remote_buffers.insert(buffer_id, buffer);
+                        }
+                        Err(error) => {
+                            if let Some(listeners) = this.loading_buffers.remove(&buffer_id) {
+                                for listener in listeners {
+                                    listener.send(Err(anyhow!(error.cloned()))).ok();
+                                }
+                            }
+                        }
+                    };
                 }
                 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
                     let buffer_id = BufferId::new(chunk.buffer_id)?;
@@ -7804,23 +7827,34 @@ impl Project {
                         .incomplete_remote_buffers
                         .get(&buffer_id)
                         .cloned()
-                        .flatten()
                         .ok_or_else(|| {
                             anyhow!(
                                 "received chunk for buffer {} without initial state",
                                 chunk.buffer_id
                             )
                         })?;
-                    let operations = chunk
-                        .operations
-                        .into_iter()
-                        .map(language::proto::deserialize_operation)
-                        .collect::<Result<Vec<_>>>()?;
-                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 
-                    if chunk.is_last {
+                    let result = maybe!({
+                        let operations = chunk
+                            .operations
+                            .into_iter()
+                            .map(language::proto::deserialize_operation)
+                            .collect::<Result<Vec<_>>>()?;
+                        buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
+                    });
+
+                    if let Err(error) = result {
                         this.incomplete_remote_buffers.remove(&buffer_id);
-                        this.register_buffer(&buffer, cx)?;
+                        if let Some(listeners) = this.loading_buffers.remove(&buffer_id) {
+                            for listener in listeners {
+                                listener.send(Err(error.cloned())).ok();
+                            }
+                        }
+                    } else {
+                        if chunk.is_last {
+                            this.incomplete_remote_buffers.remove(&buffer_id);
+                            this.register_buffer(&buffer, cx)?;
+                        }
                     }
                 }
             }
@@ -7843,12 +7877,7 @@ impl Project {
                 .opened_buffers
                 .get_mut(&buffer_id)
                 .and_then(|b| b.upgrade())
-                .or_else(|| {
-                    this.incomplete_remote_buffers
-                        .get(&buffer_id)
-                        .cloned()
-                        .flatten()
-                })
+                .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned())
             {
                 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
             }
@@ -7871,12 +7900,7 @@ impl Project {
                 .opened_buffers
                 .get(&buffer_id)
                 .and_then(|b| b.upgrade())
-                .or_else(|| {
-                    this.incomplete_remote_buffers
-                        .get(&buffer_id)
-                        .cloned()
-                        .flatten()
-                })
+                .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned())
             {
                 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
                 let worktree = this
@@ -8623,39 +8647,19 @@ impl Project {
         id: BufferId,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Model<Buffer>>> {
-        let mut opened_buffer_rx = self.opened_buffer.1.clone();
-
-        cx.spawn(move |this, mut cx| async move {
-            let buffer = loop {
-                let Some(this) = this.upgrade() else {
-                    return Err(anyhow!("project dropped"));
-                };
-
-                let buffer = this.update(&mut cx, |this, _cx| {
-                    this.opened_buffers
-                        .get(&id)
-                        .and_then(|buffer| buffer.upgrade())
-                })?;
-
-                if let Some(buffer) = buffer {
-                    break buffer;
-                } else if this.update(&mut cx, |this, _| this.is_disconnected())? {
-                    return Err(anyhow!("disconnected before buffer {} could be opened", id));
-                }
+        let buffer = self
+            .opened_buffers
+            .get(&id)
+            .and_then(|buffer| buffer.upgrade());
 
-                this.update(&mut cx, |this, _| {
-                    this.incomplete_remote_buffers.entry(id).or_default();
-                })?;
-                drop(this);
+        if let Some(buffer) = buffer {
+            return Task::ready(Ok(buffer));
+        }
 
-                opened_buffer_rx
-                    .next()
-                    .await
-                    .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
-            };
+        let (tx, rx) = oneshot::channel();
+        self.loading_buffers.entry(id).or_default().push(tx);
 
-            Ok(buffer)
-        })
+        cx.background_executor().spawn(async move { rx.await? })
     }
 
     fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
@@ -8904,11 +8908,7 @@ impl Project {
                 .opened_buffers
                 .get(&buffer_id)
                 .and_then(|buffer| buffer.upgrade())
-                .or_else(|| {
-                    this.incomplete_remote_buffers
-                        .get(&buffer_id)
-                        .and_then(|b| b.clone())
-                });
+                .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned());
             if let Some(buffer) = buffer {
                 buffer.update(cx, |buffer, cx| {
                     buffer.did_save(version, fingerprint, mtime, cx);
@@ -8938,12 +8938,7 @@ impl Project {
                 .opened_buffers
                 .get(&buffer_id)
                 .and_then(|buffer| buffer.upgrade())
-                .or_else(|| {
-                    this.incomplete_remote_buffers
-                        .get(&buffer_id)
-                        .cloned()
-                        .flatten()
-                });
+                .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned());
             if let Some(buffer) = buffer {
                 buffer.update(cx, |buffer, cx| {
                     buffer.did_reload(version, fingerprint, line_ending, mtime, cx);