Halt UpdateBuffer messages until sync if one errors

Max Brunsfeld and Antonio Scandurra created

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

crates/project/src/project.rs | 158 ++++++++++++++++++++++++++----------
1 file changed, 114 insertions(+), 44 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -13,7 +13,10 @@ use client::{proto, Client, TypedEnvelope, UserStore};
 use clock::ReplicaId;
 use collections::{hash_map, BTreeMap, HashMap, HashSet};
 use futures::{
-    channel::{mpsc, oneshot},
+    channel::{
+        mpsc::{self, UnboundedReceiver},
+        oneshot,
+    },
     future::{try_join_all, Shared},
     AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
 };
@@ -92,6 +95,7 @@ pub trait Item {
 pub struct Project {
     worktrees: Vec<WorktreeHandle>,
     active_entry: Option<ProjectEntryId>,
+    buffer_changes_tx: mpsc::UnboundedSender<BufferMessage>,
     languages: Arc<LanguageRegistry>,
     language_servers: HashMap<usize, LanguageServerState>,
     language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
@@ -130,6 +134,14 @@ pub struct Project {
     terminals: Terminals,
 }
 
+enum BufferMessage {
+    Operation {
+        buffer_id: u64,
+        operation: proto::Operation,
+    },
+    Resync,
+}
+
 enum OpenBuffer {
     Strong(ModelHandle<Buffer>),
     Weak(WeakModelHandle<Buffer>),
@@ -417,39 +429,45 @@ impl Project {
         fs: Arc<dyn Fs>,
         cx: &mut MutableAppContext,
     ) -> ModelHandle<Self> {
-        cx.add_model(|cx: &mut ModelContext<Self>| Self {
-            worktrees: Default::default(),
-            collaborators: Default::default(),
-            opened_buffers: Default::default(),
-            shared_buffers: Default::default(),
-            incomplete_remote_buffers: Default::default(),
-            loading_buffers_by_path: Default::default(),
-            loading_local_worktrees: Default::default(),
-            buffer_snapshots: Default::default(),
-            join_project_response_message_id: 0,
-            client_state: None,
-            opened_buffer: watch::channel(),
-            client_subscriptions: Vec::new(),
-            _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
-            _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
-            _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
-            active_entry: None,
-            languages,
-            client,
-            user_store,
-            fs,
-            next_entry_id: Default::default(),
-            next_diagnostic_group_id: Default::default(),
-            language_servers: Default::default(),
-            language_server_ids: Default::default(),
-            language_server_statuses: Default::default(),
-            last_workspace_edits_by_language_server: Default::default(),
-            buffers_being_formatted: Default::default(),
-            next_language_server_id: 0,
-            nonce: StdRng::from_entropy().gen(),
-            terminals: Terminals {
-                local_handles: Vec::new(),
-            },
+        cx.add_model(|cx: &mut ModelContext<Self>| {
+            let (tx, rx) = mpsc::unbounded();
+            cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+                .detach();
+            Self {
+                worktrees: Default::default(),
+                buffer_changes_tx: tx,
+                collaborators: Default::default(),
+                opened_buffers: Default::default(),
+                shared_buffers: Default::default(),
+                incomplete_remote_buffers: Default::default(),
+                loading_buffers_by_path: Default::default(),
+                loading_local_worktrees: Default::default(),
+                buffer_snapshots: Default::default(),
+                join_project_response_message_id: 0,
+                client_state: None,
+                opened_buffer: watch::channel(),
+                client_subscriptions: Vec::new(),
+                _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
+                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
+                _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
+                active_entry: None,
+                languages,
+                client,
+                user_store,
+                fs,
+                next_entry_id: Default::default(),
+                next_diagnostic_group_id: Default::default(),
+                language_servers: Default::default(),
+                language_server_ids: Default::default(),
+                language_server_statuses: Default::default(),
+                last_workspace_edits_by_language_server: Default::default(),
+                buffers_being_formatted: Default::default(),
+                next_language_server_id: 0,
+                nonce: StdRng::from_entropy().gen(),
+                terminals: Terminals {
+                    local_handles: Vec::new(),
+                },
+            }
         })
     }
 
@@ -480,8 +498,12 @@ impl Project {
                 worktrees.push(worktree);
             }
 
+            let (tx, rx) = mpsc::unbounded();
+            cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+                .detach();
             let mut this = Self {
                 worktrees: Vec::new(),
+                buffer_changes_tx: tx,
                 loading_buffers_by_path: Default::default(),
                 opened_buffer: watch::channel(),
                 shared_buffers: Default::default(),
@@ -1084,8 +1106,9 @@ impl Project {
                 )
             })
             .collect();
-        self.synchronize_remote_buffers(cx).detach_and_log_err(cx);
-
+        self.buffer_changes_tx
+            .unbounded_send(BufferMessage::Resync)
+            .unwrap();
         cx.notify();
         Ok(())
     }
@@ -1635,6 +1658,53 @@ impl Project {
         });
     }
 
+    async fn send_buffer_messages(
+        this: WeakModelHandle<Self>,
+        mut rx: UnboundedReceiver<BufferMessage>,
+        mut cx: AsyncAppContext,
+    ) {
+        let mut needs_resync_with_host = false;
+        while let Some(change) = rx.next().await {
+            if let Some(this) = this.upgrade(&mut cx) {
+                let is_local = this.read_with(&cx, |this, _| this.is_local());
+                match change {
+                    BufferMessage::Operation {
+                        buffer_id,
+                        operation,
+                    } => {
+                        if needs_resync_with_host {
+                            continue;
+                        }
+                        let request = this.read_with(&cx, |this, _| {
+                            let project_id = this.remote_id()?;
+                            Some(this.client.request(proto::UpdateBuffer {
+                                buffer_id,
+                                project_id,
+                                operations: vec![operation],
+                            }))
+                        });
+                        if let Some(request) = request {
+                            if request.await.is_err() && !is_local {
+                                needs_resync_with_host = true;
+                            }
+                        }
+                    }
+                    BufferMessage::Resync => {
+                        if this
+                            .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
+                            .await
+                            .is_ok()
+                        {
+                            needs_resync_with_host = false;
+                        }
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
     fn on_buffer_event(
         &mut self,
         buffer: ModelHandle<Buffer>,
@@ -1643,14 +1713,12 @@ impl Project {
     ) -> Option<()> {
         match event {
             BufferEvent::Operation(operation) => {
-                if let Some(project_id) = self.remote_id() {
-                    let request = self.client.request(proto::UpdateBuffer {
-                        project_id,
+                self.buffer_changes_tx
+                    .unbounded_send(BufferMessage::Operation {
                         buffer_id: buffer.read(cx).remote_id(),
-                        operations: vec![language::proto::serialize_operation(operation)],
-                    });
-                    cx.background().spawn(request).detach_and_log_err(cx);
-                }
+                        operation: language::proto::serialize_operation(operation),
+                    })
+                    .ok();
             }
             BufferEvent::Edited { .. } => {
                 let language_server = self
@@ -4861,7 +4929,9 @@ impl Project {
             }
 
             if is_host {
-                this.synchronize_remote_buffers(cx).detach_and_log_err(cx);
+                this.buffer_changes_tx
+                    .unbounded_send(BufferMessage::Resync)
+                    .unwrap();
             }
 
             cx.emit(Event::CollaboratorUpdated {