Synchronize buffers when either the host or a guest reconnects

Antonio Scandurra created

Change summary

crates/collab/src/rpc.rs            |   1 
crates/editor/src/multi_buffer.rs   |   2 
crates/language/src/buffer.rs       |   8 +
crates/language/src/buffer_tests.rs |  10 +
crates/project/src/project.rs       | 176 +++++++++++++++++++++++++++---
crates/rpc/proto/zed.proto          |  16 ++
crates/rpc/src/proto.rs             |   4 
7 files changed, 194 insertions(+), 23 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -216,6 +216,7 @@ impl Server {
             .add_request_handler(forward_project_request::<proto::PrepareRename>)
             .add_request_handler(forward_project_request::<proto::PerformRename>)
             .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
+            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
             .add_request_handler(forward_project_request::<proto::FormatBuffers>)
             .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
             .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)

crates/editor/src/multi_buffer.rs 🔗

@@ -3651,7 +3651,7 @@ mod tests {
             let state = host_buffer.read(cx).to_proto();
             let ops = cx
                 .background()
-                .block(host_buffer.read(cx).serialize_ops(cx));
+                .block(host_buffer.read(cx).serialize_ops(None, cx));
             let mut buffer = Buffer::from_proto(1, state, None).unwrap();
             buffer
                 .apply_ops(

crates/language/src/buffer.rs 🔗

@@ -398,7 +398,11 @@ impl Buffer {
         }
     }
 
-    pub fn serialize_ops(&self, cx: &AppContext) -> Task<Vec<proto::Operation>> {
+    pub fn serialize_ops(
+        &self,
+        since: Option<clock::Global>,
+        cx: &AppContext,
+    ) -> Task<Vec<proto::Operation>> {
         let mut operations = Vec::new();
         operations.extend(self.deferred_ops.iter().map(proto::serialize_operation));
         operations.extend(self.remote_selections.iter().map(|(_, set)| {
@@ -422,9 +426,11 @@ impl Buffer {
 
         let text_operations = self.text.operations().clone();
         cx.background().spawn(async move {
+            let since = since.unwrap_or_default();
             operations.extend(
                 text_operations
                     .iter()
+                    .filter(|(_, op)| !since.observed(op.local_timestamp()))
                     .map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))),
             );
             operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);

crates/language/src/buffer_tests.rs 🔗

@@ -1275,7 +1275,9 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) {
     assert_eq!(buffer1.read(cx).text(), "abcDF");
 
     let state = buffer1.read(cx).to_proto();
-    let ops = cx.background().block(buffer1.read(cx).serialize_ops(cx));
+    let ops = cx
+        .background()
+        .block(buffer1.read(cx).serialize_ops(None, cx));
     let buffer2 = cx.add_model(|cx| {
         let mut buffer = Buffer::from_proto(1, state, None).unwrap();
         buffer
@@ -1316,7 +1318,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             let state = base_buffer.read(cx).to_proto();
             let ops = cx
                 .background()
-                .block(base_buffer.read(cx).serialize_ops(cx));
+                .block(base_buffer.read(cx).serialize_ops(None, cx));
             let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap();
             buffer
                 .apply_ops(
@@ -1413,7 +1415,9 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             }
             50..=59 if replica_ids.len() < max_peers => {
                 let old_buffer_state = buffer.read(cx).to_proto();
-                let old_buffer_ops = cx.background().block(buffer.read(cx).serialize_ops(cx));
+                let old_buffer_ops = cx
+                    .background()
+                    .block(buffer.read(cx).serialize_ops(None, cx));
                 let new_replica_id = (0..=replica_ids.len() as ReplicaId)
                     .filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
                     .choose(&mut rng)

crates/project/src/project.rs 🔗

@@ -379,6 +379,7 @@ impl Project {
         client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
         client.add_model_request_handler(Self::handle_apply_code_action);
         client.add_model_request_handler(Self::handle_reload_buffers);
+        client.add_model_request_handler(Self::handle_synchronize_buffers);
         client.add_model_request_handler(Self::handle_format_buffers);
         client.add_model_request_handler(Self::handle_get_code_actions);
         client.add_model_request_handler(Self::handle_get_completions);
@@ -1082,7 +1083,6 @@ impl Project {
     ) -> Result<()> {
         self.set_worktrees_from_proto(message.worktrees, cx)?;
         self.set_collaborators_from_proto(message.collaborators, cx)?;
-
         self.language_server_statuses = message
             .language_servers
             .into_iter()
@@ -1098,6 +1098,7 @@ impl Project {
                 )
             })
             .collect();
+        self.synchronize_remote_buffers(cx).detach_and_log_err(cx);
 
         cx.notify();
         Ok(())
@@ -4631,12 +4632,17 @@ impl Project {
                 .collaborators
                 .remove(&old_peer_id)
                 .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?;
+            let is_host = collaborator.replica_id == 0;
             this.collaborators.insert(new_peer_id, collaborator);
 
             if let Some(buffers) = this.shared_buffers.remove(&old_peer_id) {
                 this.shared_buffers.insert(new_peer_id, buffers);
             }
 
+            if is_host {
+                this.synchronize_remote_buffers(cx).detach_and_log_err(cx);
+            }
+
             cx.emit(Event::CollaboratorUpdated {
                 old_peer_id,
                 new_peer_id,
@@ -5131,6 +5137,55 @@ impl Project {
         })
     }
 
+    async fn handle_synchronize_buffers(
+        this: ModelHandle<Self>,
+        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
+        _: Arc<Client>,
+        cx: AsyncAppContext,
+    ) -> Result<proto::SynchronizeBuffersResponse> {
+        let project_id = envelope.payload.project_id;
+        let mut response = proto::SynchronizeBuffersResponse {
+            buffers: Default::default(),
+        };
+
+        this.read_with(&cx, |this, cx| {
+            for buffer in envelope.payload.buffers {
+                let buffer_id = buffer.id;
+                let remote_version = language::proto::deserialize_version(buffer.version);
+                if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
+                    let buffer = buffer.read(cx);
+                    response.buffers.push(proto::BufferVersion {
+                        id: buffer_id,
+                        version: language::proto::serialize_version(&buffer.version),
+                    });
+
+                    let operations = buffer.serialize_ops(Some(remote_version), cx);
+                    let client = this.client.clone();
+                    cx.background()
+                        .spawn(
+                            async move {
+                                let operations = operations.await;
+                                for chunk in split_operations(operations) {
+                                    client
+                                        .request(proto::UpdateBuffer {
+                                            project_id,
+                                            buffer_id,
+                                            operations: chunk,
+                                        })
+                                        .await?;
+                                }
+                                anyhow::Ok(())
+                            }
+                            .log_err(),
+                        )
+                        .detach();
+                }
+            }
+        });
+
+        Ok(response)
+    }
+
     async fn handle_format_buffers(
         this: ModelHandle<Self>,
         envelope: TypedEnvelope<proto::FormatBuffers>,
@@ -5557,12 +5612,12 @@ impl Project {
             if shared_buffers.insert(buffer_id) {
                 let buffer = buffer.read(cx);
                 let state = buffer.to_proto();
-                let operations = buffer.serialize_ops(cx);
+                let operations = buffer.serialize_ops(None, cx);
                 let client = self.client.clone();
                 cx.background()
                     .spawn(
                         async move {
-                            let mut operations = operations.await;
+                            let operations = operations.await;
 
                             client.send(proto::CreateBufferForPeer {
                                 project_id,
@@ -5570,17 +5625,9 @@ impl Project {
                                 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
                             })?;
 
-                            loop {
-                                #[cfg(any(test, feature = "test-support"))]
-                                const CHUNK_SIZE: usize = 5;
-
-                                #[cfg(not(any(test, feature = "test-support")))]
-                                const CHUNK_SIZE: usize = 100;
-
-                                let chunk = operations
-                                    .drain(..cmp::min(CHUNK_SIZE, operations.len()))
-                                    .collect();
-                                let is_last = operations.is_empty();
+                            let mut chunks = split_operations(operations).peekable();
+                            while let Some(chunk) = chunks.next() {
+                                let is_last = chunks.peek().is_none();
                                 client.send(proto::CreateBufferForPeer {
                                     project_id,
                                     peer_id: Some(peer_id),
@@ -5592,10 +5639,6 @@ impl Project {
                                         },
                                     )),
                                 })?;
-
-                                if is_last {
-                                    break;
-                                }
                             }
 
                             Ok(())
@@ -5638,6 +5681,81 @@ impl Project {
         })
     }
 
+    fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        let project_id = match self.client_state.as_ref() {
+            Some(ProjectClientState::Remote {
+                sharing_has_stopped,
+                remote_id,
+                ..
+            }) => {
+                if *sharing_has_stopped {
+                    return Task::ready(Err(anyhow!(
+                        "can't synchronize remote buffers on a readonly project"
+                    )));
+                } else {
+                    *remote_id
+                }
+            }
+            Some(ProjectClientState::Local { .. }) | None => {
+                return Task::ready(Err(anyhow!(
+                    "can't synchronize remote buffers on a local project"
+                )))
+            }
+        };
+
+        let client = self.client.clone();
+        cx.spawn(|this, cx| async move {
+            let buffers = this.read_with(&cx, |this, cx| {
+                this.opened_buffers
+                    .iter()
+                    .filter_map(|(id, buffer)| {
+                        let buffer = buffer.upgrade(cx)?;
+                        Some(proto::BufferVersion {
+                            id: *id,
+                            version: language::proto::serialize_version(&buffer.read(cx).version),
+                        })
+                    })
+                    .collect()
+            });
+            let response = client
+                .request(proto::SynchronizeBuffers {
+                    project_id,
+                    buffers,
+                })
+                .await?;
+
+            let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| {
+                let client = client.clone();
+                let buffer_id = buffer.id;
+                let remote_version = language::proto::deserialize_version(buffer.version);
+                this.read_with(&cx, |this, cx| {
+                    if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
+                        let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx);
+                        cx.background().spawn(async move {
+                            let operations = operations.await;
+                            for chunk in split_operations(operations) {
+                                client
+                                    .request(proto::UpdateBuffer {
+                                        project_id,
+                                        buffer_id,
+                                        operations: chunk,
+                                    })
+                                    .await?;
+                            }
+                            anyhow::Ok(())
+                        })
+                    } else {
+                        Task::ready(Ok(()))
+                    }
+                })
+            });
+            futures::future::join_all(send_updates_for_buffers)
+                .await
+                .into_iter()
+                .collect()
+        })
+    }
+
     pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
         self.worktrees(cx)
             .map(|worktree| {
@@ -6126,6 +6244,28 @@ impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
     }
 }
 
+fn split_operations(
+    mut operations: Vec<proto::Operation>,
+) -> impl Iterator<Item = Vec<proto::Operation>> {
+    #[cfg(any(test, feature = "test-support"))]
+    const CHUNK_SIZE: usize = 5;
+
+    #[cfg(not(any(test, feature = "test-support")))]
+    const CHUNK_SIZE: usize = 100;
+
+    std::iter::from_fn(move || {
+        if operations.is_empty() {
+            return None;
+        }
+
+        Some(
+            operations
+                .drain(..cmp::min(CHUNK_SIZE, operations.len()))
+                .collect(),
+        )
+    })
+}
+
 fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
     proto::Symbol {
         language_server_name: symbol.language_server_name.0.to_string(),

crates/rpc/proto/zed.proto 🔗

@@ -79,6 +79,8 @@ message Envelope {
         BufferReloaded buffer_reloaded = 61;
         ReloadBuffers reload_buffers = 62;
         ReloadBuffersResponse reload_buffers_response = 63;
+        SynchronizeBuffers synchronize_buffers = 200;
+        SynchronizeBuffersResponse synchronize_buffers_response = 201;
         FormatBuffers format_buffers = 64;
         FormatBuffersResponse format_buffers_response = 65;
         GetCompletions get_completions = 66;
@@ -538,6 +540,20 @@ message ReloadBuffersResponse {
     ProjectTransaction transaction = 1;
 }
 
+message SynchronizeBuffers {
+    uint64 project_id = 1;
+    repeated BufferVersion buffers = 2;
+}
+
+message SynchronizeBuffersResponse {
+    repeated BufferVersion buffers = 1;
+}
+
+message BufferVersion {
+    uint64 id = 1;
+    repeated VectorClockEntry version = 2;
+}
+
 enum FormatTrigger {
     Save = 0;
     Manual = 1;

crates/rpc/src/proto.rs 🔗

@@ -207,6 +207,8 @@ messages!(
     (ShareProjectResponse, Foreground),
     (ShowContacts, Foreground),
     (StartLanguageServer, Foreground),
+    (SynchronizeBuffers, Foreground),
+    (SynchronizeBuffersResponse, Foreground),
     (Test, Foreground),
     (Unfollow, Foreground),
     (UnshareProject, Foreground),
@@ -274,6 +276,7 @@ request_messages!(
     (SearchProject, SearchProjectResponse),
     (SendChannelMessage, SendChannelMessageResponse),
     (ShareProject, ShareProjectResponse),
+    (SynchronizeBuffers, SynchronizeBuffersResponse),
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateParticipantLocation, Ack),
@@ -315,6 +318,7 @@ entity_messages!(
     SaveBuffer,
     SearchProject,
     StartLanguageServer,
+    SynchronizeBuffers,
     Unfollow,
     UnshareProject,
     UpdateBuffer,