Avoid applying outdated UpdateProject messages

Max Brunsfeld and Nathan Sobo created

Co-authored-by: Nathan Sobo <nathan@zed.dev>

Change summary

crates/client/src/client.rs   | 15 +++++++++++++--
crates/project/src/project.rs | 18 +++++++++++++-----
crates/rpc/src/peer.rs        | 25 +++++++++++++++++++++----
3 files changed, 47 insertions(+), 11 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -10,7 +10,10 @@ use async_tungstenite::tungstenite::{
     error::Error as WebsocketError,
     http::{Request, StatusCode},
 };
-use futures::{future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryStreamExt};
+use futures::{
+    future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _,
+    TryStreamExt,
+};
 use gpui::{
     actions,
     serde_json::{self, Value},
@@ -1187,6 +1190,14 @@ impl Client {
         &self,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
+        self.request_envelope(request)
+            .map_ok(|envelope| envelope.payload)
+    }
+
+    pub fn request_envelope<T: RequestMessage>(
+        &self,
+        request: T,
+    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
         let client_id = self.id;
         log::debug!(
             "rpc request start. client_id:{}. name:{}",
@@ -1195,7 +1206,7 @@ impl Client {
         );
         let response = self
             .connection_id()
-            .map(|conn_id| self.peer.request(conn_id, request));
+            .map(|conn_id| self.peer.request_envelope(conn_id, request));
         async move {
             let response = response?.await;
             log::debug!(

crates/project/src/project.rs 🔗

@@ -100,6 +100,7 @@ pub struct Project {
     next_language_server_id: usize,
     client: Arc<client::Client>,
     next_entry_id: Arc<AtomicUsize>,
+    join_project_response_message_id: u32,
     next_diagnostic_group_id: usize,
     user_store: ModelHandle<UserStore>,
     fs: Arc<dyn Fs>,
@@ -425,6 +426,7 @@ impl Project {
             loading_buffers_by_path: Default::default(),
             loading_local_worktrees: Default::default(),
             buffer_snapshots: Default::default(),
+            join_project_response_message_id: 0,
             client_state: None,
             opened_buffer: watch::channel(),
             client_subscriptions: Vec::new(),
@@ -463,15 +465,15 @@ impl Project {
 
         let subscription = client.subscribe_to_entity(remote_id);
         let response = client
-            .request(proto::JoinProject {
+            .request_envelope(proto::JoinProject {
                 project_id: remote_id,
             })
             .await?;
         let this = cx.add_model(|cx| {
-            let replica_id = response.replica_id as ReplicaId;
+            let replica_id = response.payload.replica_id as ReplicaId;
 
             let mut worktrees = Vec::new();
-            for worktree in response.worktrees {
+            for worktree in response.payload.worktrees {
                 let worktree = cx.update(|cx| {
                     Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
                 });
@@ -487,6 +489,7 @@ impl Project {
                 loading_local_worktrees: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
+                join_project_response_message_id: response.message_id,
                 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
                 _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
                 languages,
@@ -505,6 +508,7 @@ impl Project {
                 language_servers: Default::default(),
                 language_server_ids: Default::default(),
                 language_server_statuses: response
+                    .payload
                     .language_servers
                     .into_iter()
                     .map(|server| {
@@ -537,6 +541,7 @@ impl Project {
         let subscription = subscription.set_model(&this, &mut cx);
 
         let user_ids = response
+            .payload
             .collaborators
             .iter()
             .map(|peer| peer.user_id)
@@ -546,7 +551,7 @@ impl Project {
             .await?;
 
         this.update(&mut cx, |this, cx| {
-            this.set_collaborators_from_proto(response.collaborators, cx)?;
+            this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
             this.client_subscriptions.push(subscription);
             anyhow::Ok(())
         })?;
@@ -4930,7 +4935,10 @@ impl Project {
         mut cx: AsyncAppContext,
     ) -> Result<()> {
         this.update(&mut cx, |this, cx| {
-            this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
+            // Don't handle messages that were sent before the response to us joining the project
+            if envelope.message_id > this.join_project_response_message_id {
+                this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
+            }
             Ok(())
         })
     }

crates/rpc/src/peer.rs 🔗

@@ -7,7 +7,7 @@ use collections::HashMap;
 use futures::{
     channel::{mpsc, oneshot},
     stream::BoxStream,
-    FutureExt, SinkExt, StreamExt,
+    FutureExt, SinkExt, StreamExt, TryFutureExt,
 };
 use parking_lot::{Mutex, RwLock};
 use serde::{ser::SerializeStruct, Serialize};
@@ -71,6 +71,7 @@ impl<T> Clone for Receipt<T> {
 
 impl<T> Copy for Receipt<T> {}
 
+#[derive(Clone, Debug)]
 pub struct TypedEnvelope<T> {
     pub sender_id: ConnectionId,
     pub original_sender_id: Option<PeerId>,
@@ -370,6 +371,15 @@ impl Peer {
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
+        self.request_internal(None, receiver_id, request)
+            .map_ok(|envelope| envelope.payload)
+    }
+
+    pub fn request_envelope<T: RequestMessage>(
+        &self,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
         self.request_internal(None, receiver_id, request)
     }
 
@@ -380,6 +390,7 @@ impl Peer {
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
         self.request_internal(Some(sender_id), receiver_id, request)
+            .map_ok(|envelope| envelope.payload)
     }
 
     pub fn request_internal<T: RequestMessage>(
@@ -387,7 +398,7 @@ impl Peer {
         original_sender_id: Option<ConnectionId>,
         receiver_id: ConnectionId,
         request: T,
-    ) -> impl Future<Output = Result<T::Response>> {
+    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
         let (tx, rx) = oneshot::channel();
         let send = self.connection_state(receiver_id).and_then(|connection| {
             let message_id = connection.next_message_id.fetch_add(1, SeqCst);
@@ -410,6 +421,7 @@ impl Peer {
         async move {
             send?;
             let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
+
             if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
                 Err(anyhow!(
                     "RPC request {} failed - {}",
@@ -417,8 +429,13 @@ impl Peer {
                     error.message
                 ))
             } else {
-                T::Response::from_envelope(response)
-                    .ok_or_else(|| anyhow!("received response of the wrong type"))
+                Ok(TypedEnvelope {
+                    message_id: response.id,
+                    sender_id: receiver_id,
+                    original_sender_id: response.original_sender_id,
+                    payload: T::Response::from_envelope(response)
+                        .ok_or_else(|| anyhow!("received response of the wrong type"))?,
+                })
             }
         }
     }