Don't show conflict indicator on remote buffer after a reload

Antonio Scandurra created

Change summary

crates/language/src/buffer.rs  | 27 ++++++++-
crates/project/src/project.rs  | 32 +++++++++++
crates/project/src/worktree.rs | 22 ++++++++
crates/rpc/proto/zed.proto     | 42 +++++++++------
crates/rpc/src/proto.rs        |  2 
crates/server/src/rpc.rs       | 99 ++++++++++++++++++++++++++++++++++++
6 files changed, 204 insertions(+), 20 deletions(-)

Detailed changes

crates/language/src/buffer.rs 🔗

@@ -198,6 +198,14 @@ pub trait LocalFile: File {
     fn abs_path(&self, cx: &AppContext) -> PathBuf;
 
     fn load(&self, cx: &AppContext) -> Task<Result<String>>;
+
+    fn buffer_reloaded(
+        &self,
+        buffer_id: u64,
+        version: &clock::Global,
+        mtime: SystemTime,
+        cx: &mut MutableAppContext,
+    );
 }
 
 pub(crate) struct QueryCursorHandle(Option<QueryCursor>);
@@ -664,6 +672,21 @@ impl Buffer {
         cx.emit(Event::Saved);
     }
 
+    pub fn did_reload(
+        &mut self,
+        version: clock::Global,
+        mtime: SystemTime,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.saved_mtime = mtime;
+        self.saved_version = version;
+        if let Some(file) = self.file.as_ref().and_then(|f| f.as_local()) {
+            file.buffer_reloaded(self.remote_id(), &self.saved_version, self.saved_mtime, cx);
+        }
+        cx.emit(Event::Reloaded);
+        cx.notify();
+    }
+
     pub fn file_updated(
         &mut self,
         new_file: Box<dyn File>,
@@ -708,9 +731,7 @@ impl Buffer {
                                     .await;
                                 this.update(&mut cx, |this, cx| {
                                     if this.apply_diff(diff, cx) {
-                                        this.saved_version = this.version();
-                                        this.saved_mtime = new_mtime;
-                                        cx.emit(Event::Reloaded);
+                                        this.did_reload(this.version(), new_mtime, cx);
                                     }
                                 });
                             }

crates/project/src/project.rs 🔗

@@ -299,6 +299,7 @@ impl Project {
                     ),
                     client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
                     client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer_file),
+                    client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_reloaded),
                     client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
                 ],
                 client,
@@ -1694,6 +1695,37 @@ impl Project {
         Ok(())
     }
 
+    pub fn handle_buffer_reloaded(
+        &mut self,
+        envelope: TypedEnvelope<proto::BufferReloaded>,
+        _: Arc<Client>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let payload = envelope.payload.clone();
+        let buffer = self
+            .open_buffers
+            .get(&(payload.buffer_id as usize))
+            .and_then(|buf| {
+                if let OpenBuffer::Loaded(buffer) = buf {
+                    buffer.upgrade(cx)
+                } else {
+                    None
+                }
+            });
+        if let Some(buffer) = buffer {
+            buffer.update(cx, |buffer, cx| {
+                let version = payload.version.try_into()?;
+                let mtime = payload
+                    .mtime
+                    .ok_or_else(|| anyhow!("missing mtime"))?
+                    .into();
+                buffer.did_reload(version, mtime, cx);
+                Result::<_, anyhow::Error>::Ok(())
+            })?;
+        }
+        Ok(())
+    }
+
     pub fn match_paths<'a>(
         &self,
         query: &'a str,

crates/project/src/worktree.rs 🔗

@@ -1520,6 +1520,28 @@ impl language::LocalFile for File {
         cx.background()
             .spawn(async move { fs.load(&abs_path).await })
     }
+
+    fn buffer_reloaded(
+        &self,
+        buffer_id: u64,
+        version: &clock::Global,
+        mtime: SystemTime,
+        cx: &mut MutableAppContext,
+    ) {
+        let worktree = self.worktree.read(cx).as_local().unwrap();
+        if let Some(project_id) = worktree.share.as_ref().map(|share| share.project_id) {
+            let rpc = worktree.client.clone();
+            let message = proto::BufferReloaded {
+                project_id,
+                buffer_id,
+                version: version.into(),
+                mtime: Some(mtime.into()),
+            };
+            cx.background()
+                .spawn(async move { rpc.send(message).await })
+                .detach_and_log_err(cx);
+        }
+    }
 }
 
 impl File {

crates/rpc/proto/zed.proto 🔗

@@ -36,23 +36,24 @@ message Envelope {
         UpdateBufferFile update_buffer_file = 28;
         SaveBuffer save_buffer = 29;
         BufferSaved buffer_saved = 30;
-        FormatBuffer format_buffer = 31;
-
-        GetChannels get_channels = 32;
-        GetChannelsResponse get_channels_response = 33;
-        JoinChannel join_channel = 34;
-        JoinChannelResponse join_channel_response = 35;
-        LeaveChannel leave_channel = 36;
-        SendChannelMessage send_channel_message = 37;
-        SendChannelMessageResponse send_channel_message_response = 38;
-        ChannelMessageSent channel_message_sent = 39;
-        GetChannelMessages get_channel_messages = 40;
-        GetChannelMessagesResponse get_channel_messages_response = 41;
-
-        UpdateContacts update_contacts = 42;
-
-        GetUsers get_users = 43;
-        GetUsersResponse get_users_response = 44;
+        BufferReloaded buffer_reloaded = 31;
+        FormatBuffer format_buffer = 32;
+
+        GetChannels get_channels = 33;
+        GetChannelsResponse get_channels_response = 34;
+        JoinChannel join_channel = 35;
+        JoinChannelResponse join_channel_response = 36;
+        LeaveChannel leave_channel = 37;
+        SendChannelMessage send_channel_message = 38;
+        SendChannelMessageResponse send_channel_message_response = 39;
+        ChannelMessageSent channel_message_sent = 40;
+        GetChannelMessages get_channel_messages = 41;
+        GetChannelMessagesResponse get_channel_messages_response = 42;
+
+        UpdateContacts update_contacts = 43;
+
+        GetUsers get_users = 44;
+        GetUsersResponse get_users_response = 45;
     }
 }
 
@@ -172,6 +173,13 @@ message BufferSaved {
     Timestamp mtime = 4;
 }
 
+message BufferReloaded {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    repeated VectorClockEntry version = 3;
+    Timestamp mtime = 4;
+}
+
 message FormatBuffer {
     uint64 project_id = 1;
     uint64 buffer_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -122,6 +122,7 @@ macro_rules! entity_messages {
 messages!(
     Ack,
     AddProjectCollaborator,
+    BufferReloaded,
     BufferSaved,
     ChannelMessageSent,
     CloseBuffer,
@@ -184,6 +185,7 @@ request_messages!(
 entity_messages!(
     project_id,
     AddProjectCollaborator,
+    BufferReloaded,
     BufferSaved,
     CloseBuffer,
     DiskBasedDiagnosticsUpdated,

crates/server/src/rpc.rs 🔗

@@ -78,6 +78,7 @@ impl Server {
             .add_handler(Server::close_buffer)
             .add_handler(Server::update_buffer)
             .add_handler(Server::update_buffer_file)
+            .add_handler(Server::buffer_reloaded)
             .add_handler(Server::buffer_saved)
             .add_handler(Server::save_buffer)
             .add_handler(Server::format_buffer)
@@ -721,6 +722,22 @@ impl Server {
         Ok(())
     }
 
+    async fn buffer_reloaded(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::BufferReloaded>,
+    ) -> tide::Result<()> {
+        let receiver_ids = self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)
+            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
+        broadcast(request.sender_id, receiver_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })
+        .await?;
+        Ok(())
+    }
+
     async fn buffer_saved(
         self: Arc<Server>,
         request: TypedEnvelope<proto::BufferSaved>,
@@ -1661,6 +1678,88 @@ mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
+        cx_a.foreground().forbid_parking();
+        let lang_registry = Arc::new(LanguageRegistry::new());
+        let fs = Arc::new(FakeFs::new());
+
+        // Connect to a server as 2 clients.
+        let mut server = TestServer::start(cx_a.foreground()).await;
+        let client_a = server.create_client(&mut cx_a, "user_a").await;
+        let client_b = server.create_client(&mut cx_b, "user_b").await;
+
+        // Share a project as client A
+        fs.insert_tree(
+            "/dir",
+            json!({
+                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
+                "a.txt": "a-contents",
+            }),
+        )
+        .await;
+
+        let project_a = cx_a.update(|cx| {
+            Project::local(
+                client_a.clone(),
+                client_a.user_store.clone(),
+                lang_registry.clone(),
+                fs.clone(),
+                cx,
+            )
+        });
+        let (worktree_a, _) = project_a
+            .update(&mut cx_a, |p, cx| {
+                p.find_or_create_local_worktree("/dir", false, cx)
+            })
+            .await
+            .unwrap();
+        worktree_a
+            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
+            .await;
+        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
+        project_a
+            .update(&mut cx_a, |p, cx| p.share(cx))
+            .await
+            .unwrap();
+
+        // Join that project as client B
+        let project_b = Project::remote(
+            project_id,
+            client_b.clone(),
+            client_b.user_store.clone(),
+            lang_registry.clone(),
+            fs.clone(),
+            &mut cx_b.to_async(),
+        )
+        .await
+        .unwrap();
+        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
+
+        // Open a buffer as client B
+        let buffer_b = project_b
+            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
+            .await
+            .unwrap();
+        buffer_b.read_with(&cx_b, |buf, _| {
+            assert!(!buf.is_dirty());
+            assert!(!buf.has_conflict());
+        });
+
+        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
+            .await
+            .unwrap();
+        buffer_b
+            .condition(&cx_b, |buf, _| {
+                buf.text() == "new contents" && !buf.is_dirty()
+            })
+            .await;
+        buffer_b.read_with(&cx_b, |buf, _| {
+            assert!(!buf.has_conflict());
+        });
+    }
+
     #[gpui::test]
     async fn test_editing_while_guest_opens_buffer(
         mut cx_a: TestAppContext,