Start work on rejoining rooms, supplying all project info at once

Max Brunsfeld and Nathan Sobo created

Co-authored-by: Nathan Sobo <nathan@zed.dev>

Change summary

crates/call/src/room.rs           | 148 +++++++++++++++++++++-----------
crates/collab_ui/src/collab_ui.rs |   4 
crates/project/src/project.rs     | 103 ++++++++++++++--------
crates/rpc/proto/zed.proto        |  63 ++++++++-----
crates/rpc/src/proto.rs           |   3 
5 files changed, 208 insertions(+), 113 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -7,7 +7,7 @@ use client::{
     proto::{self, PeerId},
     Client, TypedEnvelope, User, UserStore,
 };
-use collections::{BTreeMap, HashSet};
+use collections::{BTreeMap, HashMap, HashSet};
 use futures::{FutureExt, StreamExt};
 use gpui::{
     AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
@@ -44,6 +44,7 @@ pub struct Room {
     live_kit: Option<LiveKitRoom>,
     status: RoomStatus,
     shared_projects: HashSet<WeakModelHandle<Project>>,
+    joined_projects: HashSet<WeakModelHandle<Project>>,
     local_participant: LocalParticipant,
     remote_participants: BTreeMap<u64, RemoteParticipant>,
     pending_participants: Vec<Arc<User>>,
@@ -134,6 +135,7 @@ impl Room {
             live_kit: live_kit_room,
             status: RoomStatus::Online,
             shared_projects: Default::default(),
+            joined_projects: Default::default(),
             participant_user_ids: Default::default(),
             local_participant: Default::default(),
             remote_participants: Default::default(),
@@ -259,16 +261,15 @@ impl Room {
                 .next()
                 .await
                 .map_or(false, |s| s.is_connected());
+
             // Even if we're initially connected, any future change of the status means we momentarily disconnected.
             if !is_connected || client_status.next().await.is_some() {
                 log::info!("detected client disconnection");
-                let room_id = this
-                    .upgrade(&cx)
+                this.upgrade(&cx)
                     .ok_or_else(|| anyhow!("room was dropped"))?
                     .update(&mut cx, |this, cx| {
                         this.status = RoomStatus::Rejoining;
                         cx.notify();
-                        this.id
                     });
 
                 // Wait for client to re-establish a connection to the server.
@@ -281,40 +282,21 @@ impl Room {
                                 "waiting for client status change, remaining attempts {}",
                                 remaining_attempts
                             );
-                            if let Some(status) = client_status.next().await {
-                                if status.is_connected() {
-                                    log::info!("client reconnected, attempting to rejoin room");
-                                    let rejoin_room = async {
-                                        let response =
-                                            client.request(proto::JoinRoom { id: room_id }).await?;
-                                        let room_proto =
-                                            response.room.ok_or_else(|| anyhow!("invalid room"))?;
-                                        this.upgrade(&cx)
-                                            .ok_or_else(|| anyhow!("room was dropped"))?
-                                            .update(&mut cx, |this, cx| {
-                                                this.status = RoomStatus::Online;
-                                                this.apply_room_update(room_proto, cx)?;
-                                                this.shared_projects.retain(|project| {
-                                                    let Some(project) = project.upgrade(cx) else { return false };
-                                                    project.update(cx, |project, cx| {
-                                                        if let Some(remote_id) = project.remote_id() {
-                                                            project.shared(remote_id, cx).detach()
-                                                        }
-                                                    });
-                                                    true
-                                                });
-                                                anyhow::Ok(())
-                                            })
-                                    };
-
-                                    if rejoin_room.await.log_err().is_some() {
-                                        return true;
-                                    } else {
-                                        remaining_attempts -= 1;
-                                    }
+                            let Some(status) = client_status.next().await else { break };
+                            if status.is_connected() {
+                                log::info!("client reconnected, attempting to rejoin room");
+
+                                let Some(this) = this.upgrade(&cx) else { break };
+                                if this
+                                    .update(&mut cx, |this, cx| this.rejoin(cx))
+                                    .await
+                                    .log_err()
+                                    .is_some()
+                                {
+                                    return true;
+                                } else {
+                                    remaining_attempts -= 1;
                                 }
-                            } else {
-                                return false;
                             }
                         }
                         false
@@ -351,6 +333,73 @@ impl Room {
         }
     }
 
+    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        let mut projects = HashMap::default();
+        let mut reshared_projects = Vec::new();
+        let mut rejoined_projects = Vec::new();
+        self.shared_projects.retain(|project| {
+            if let Some(handle) = project.upgrade(cx) {
+                let project = handle.read(cx);
+                if let Some(project_id) = project.remote_id() {
+                    projects.insert(project_id, handle.clone());
+                    reshared_projects.push(proto::UpdateProject {
+                        project_id,
+                        worktrees: project.worktree_metadata_protos(cx),
+                    });
+                    return true;
+                }
+            }
+            false
+        });
+        self.joined_projects.retain(|project| {
+            if let Some(handle) = project.upgrade(cx) {
+                let project = handle.read(cx);
+                if let Some(project_id) = project.remote_id() {
+                    rejoined_projects.push(proto::RejoinProject {
+                        project_id,
+                        worktrees: project
+                            .worktrees(cx)
+                            .map(|worktree| {
+                                let worktree = worktree.read(cx);
+                                proto::RejoinWorktree {
+                                    id: worktree.id().to_proto(),
+                                    scan_id: worktree.scan_id() as u64,
+                                }
+                            })
+                            .collect(),
+                    });
+                }
+                return true;
+            }
+            false
+        });
+
+        let response = self.client.request(proto::RejoinRoom {
+            id: self.id,
+            reshared_projects,
+            rejoined_projects,
+        });
+
+        cx.spawn(|this, mut cx| async move {
+            let response = response.await?;
+            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
+            this.update(&mut cx, |this, cx| {
+                this.status = RoomStatus::Online;
+                this.apply_room_update(room_proto, cx)?;
+
+                for shared_project in response.reshared_projects {
+                    if let Some(project) = projects.get(&shared_project.id) {
+                        project.update(cx, |project, cx| {
+                            project.reshared(shared_project, cx).log_err();
+                        });
+                    }
+                }
+
+                anyhow::Ok(())
+            })
+        })
+    }
+
     pub fn id(&self) -> u64 {
         self.id
     }
@@ -641,6 +690,17 @@ impl Room {
         })
     }
 
+    pub fn joined_project(&mut self, project: ModelHandle<Project>, cx: &mut ModelContext<Self>) {
+        self.joined_projects.retain(|project| {
+            if let Some(project) = project.upgrade(cx) {
+                !project.read(cx).is_read_only()
+            } else {
+                false
+            }
+        });
+        self.joined_projects.insert(project.downgrade());
+    }
+
     pub(crate) fn share_project(
         &mut self,
         project: ModelHandle<Project>,
@@ -652,19 +712,7 @@ impl Room {
 
         let request = self.client.request(proto::ShareProject {
             room_id: self.id(),
-            worktrees: project
-                .read(cx)
-                .worktrees(cx)
-                .map(|worktree| {
-                    let worktree = worktree.read(cx);
-                    proto::WorktreeMetadata {
-                        id: worktree.id().to_proto(),
-                        root_name: worktree.root_name().into(),
-                        visible: worktree.is_visible(),
-                        abs_path: worktree.abs_path().to_string_lossy().into(),
-                    }
-                })
-                .collect(),
+            worktrees: project.read(cx).worktree_metadata_protos(cx),
         });
         cx.spawn(|this, mut cx| async move {
             let response = request.await?;

crates/collab_ui/src/collab_ui.rs 🔗

@@ -68,6 +68,10 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
 
             workspace.update(&mut cx, |workspace, cx| {
                 if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
+                    room.update(cx, |room, cx| {
+                        room.joined_project(workspace.project().clone(), cx);
+                    });
+
                     let follow_peer_id = room
                         .read(cx)
                         .remote_participants()

crates/project/src/project.rs 🔗

@@ -13,7 +13,7 @@ use collections::{hash_map, BTreeMap, HashMap, HashSet};
 use futures::{
     channel::{mpsc, oneshot},
     future::Shared,
-    AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
+    select_biased, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
 };
 use gpui::{
     AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
@@ -151,7 +151,6 @@ enum ProjectClientState {
         remote_id: u64,
         metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
         _maintain_metadata: Task<()>,
-        _detect_unshare: Task<Option<()>>,
     },
     Remote {
         sharing_has_stopped: bool,
@@ -552,16 +551,12 @@ impl Project {
         user_store
             .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
             .await?;
-        let mut collaborators = HashMap::default();
-        for message in response.collaborators {
-            let collaborator = Collaborator::from_proto(message)?;
-            collaborators.insert(collaborator.peer_id, collaborator);
-        }
 
-        this.update(&mut cx, |this, _| {
-            this.collaborators = collaborators;
+        this.update(&mut cx, |this, cx| {
+            this.set_collaborators_from_proto(response.collaborators, cx)?;
             this.client_subscriptions.push(subscription);
-        });
+            anyhow::Ok(())
+        })?;
 
         Ok(this)
     }
@@ -1055,49 +1050,39 @@ impl Project {
             remote_id: project_id,
             metadata_changed: metadata_changed_tx,
             _maintain_metadata: cx.spawn_weak(move |this, cx| async move {
-                while let Some(tx) = metadata_changed_rx.next().await {
-                    let mut txs = vec![tx];
-                    while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
-                        txs.push(next_tx);
+                let mut txs = Vec::new();
+                loop {
+                    select_biased! {
+                        tx = metadata_changed_rx.next().fuse() => {
+                            let Some(tx) = tx else { break };
+                            txs.push(tx);
+                            while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
+                                txs.push(next_tx);
+                            }
+                        }
+                        status = status.next().fuse() => {
+                            let Some(status) = status else { break };
+                            if !status.is_connected() {
+                                continue
+                            }
+                        }
                     }
 
                     let Some(this) = this.upgrade(&cx) else { break };
                     this.read_with(&cx, |this, cx| {
-                        let worktrees = this
-                            .worktrees
-                            .iter()
-                            .filter_map(|worktree| {
-                                worktree.upgrade(cx).map(|worktree| {
-                                    worktree.read(cx).as_local().unwrap().metadata_proto()
-                                })
-                            })
-                            .collect();
                         this.client.request(proto::UpdateProject {
                             project_id,
-                            worktrees,
+                            worktrees: this.worktree_metadata_protos(cx),
                         })
                     })
                     .await
                     .log_err();
 
-                    for tx in txs {
+                    for tx in txs.drain(..) {
                         let _ = tx.send(());
                     }
                 }
             }),
-            _detect_unshare: cx.spawn_weak(move |this, mut cx| {
-                async move {
-                    let is_connected = status.next().await.map_or(false, |s| s.is_connected());
-                    // Even if we're initially connected, any future change of the status means we momentarily disconnected.
-                    if !is_connected || status.next().await.is_some() {
-                        if let Some(this) = this.upgrade(&cx) {
-                            let _ = this.update(&mut cx, |this, cx| this.unshare(cx));
-                        }
-                    }
-                    Ok(())
-                }
-                .log_err()
-            }),
         });
 
         cx.foreground().spawn(async move {
@@ -1106,6 +1091,29 @@ impl Project {
         })
     }
 
+    pub fn reshared(
+        &mut self,
+        message: proto::ResharedProject,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        self.set_collaborators_from_proto(message.collaborators, cx)?;
+        Ok(())
+    }
+
+    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
+        self.worktrees(cx)
+            .map(|worktree| {
+                let worktree = worktree.read(cx);
+                proto::WorktreeMetadata {
+                    id: worktree.id().to_proto(),
+                    root_name: worktree.root_name().into(),
+                    visible: worktree.is_visible(),
+                    abs_path: worktree.abs_path().to_string_lossy().into(),
+                }
+            })
+            .collect()
+    }
+
     pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
         if self.is_remote() {
             return Err(anyhow!("attempted to unshare a remote project"));
@@ -5637,6 +5645,25 @@ impl Project {
         })
     }
 
+    fn set_collaborators_from_proto(
+        &mut self,
+        messages: Vec<proto::Collaborator>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let mut collaborators = HashMap::default();
+        for message in messages {
+            let collaborator = Collaborator::from_proto(message)?;
+            collaborators.insert(collaborator.peer_id, collaborator);
+        }
+        for old_peer_id in self.collaborators.keys() {
+            if !collaborators.contains_key(old_peer_id) {
+                cx.emit(Event::CollaboratorLeft(*old_peer_id));
+            }
+        }
+        self.collaborators = collaborators;
+        Ok(())
+    }
+
     fn deserialize_symbol(
         &self,
         serialized_symbol: proto::Symbol,

crates/rpc/proto/zed.proto 🔗

@@ -21,6 +21,8 @@ message Envelope {
         CreateRoomResponse create_room_response = 10;
         JoinRoom join_room = 11;
         JoinRoomResponse join_room_response = 12;
+        RejoinRoom rejoin_room = 108;
+        RejoinRoomResponse rejoin_room_response = 109;
         LeaveRoom leave_room = 13;
         Call call = 14;
         IncomingCall incoming_call = 15;
@@ -161,6 +163,42 @@ message JoinRoomResponse {
     optional LiveKitConnectionInfo live_kit_connection_info = 2;
 }
 
+message RejoinRoom {
+    uint64 id = 1;
+    repeated UpdateProject reshared_projects = 2;
+    repeated RejoinProject rejoined_projects = 3;
+    // relay open buffers and their vector clock
+}
+
+message RejoinProject {
+    uint64 project_id = 1;
+    repeated RejoinWorktree worktrees = 2;
+}
+
+message RejoinWorktree {
+    uint64 id = 1;
+    uint64 scan_id = 2;
+}
+
+message RejoinRoomResponse {
+    Room room = 1;
+    repeated ResharedProject reshared_projects = 2;
+    repeated RejoinedProject rejoined_projects = 3;
+}
+
+message ResharedProject {
+    uint64 id = 1;
+    repeated Collaborator collaborators = 2;
+}
+
+message RejoinedProject {
+    uint64 id = 1;
+    uint32 replica_id = 2;
+    repeated WorktreeMetadata worktrees = 3;
+    repeated Collaborator collaborators = 4;
+    repeated LanguageServer language_servers = 5;
+}
+
 message LeaveRoom {}
 
 message Room {
@@ -253,15 +291,6 @@ message ShareProjectResponse {
     uint64 project_id = 1;
 }
 
-message ReshareProject {
-    uint64 id = 1;
-    repeated WorktreeMetadata worktrees = 2;
-}
-
-message ReshareProjectResponse {
-    repeated Collaborator collaborators = 1;
-}
-
 message UnshareProject {
     uint64 project_id = 1;
 }
@@ -282,22 +311,6 @@ message JoinProjectResponse {
     repeated LanguageServer language_servers = 4;
 }
 
-message RejoinProject {
-    uint64 project_id = 1;
-    repeated RejoinWorktree worktrees = 2;
-}
-
-message RejoinWorktree {
-    uint64 id = 1;
-    uint64 scan_id = 2;
-}
-
-message RejoinProjectResponse {
-    repeated WorktreeMetadata worktrees = 1;
-    repeated Collaborator collaborators = 2;
-    repeated LanguageServer language_servers = 3;
-}
-
 message LeaveProject {
     uint64 project_id = 1;
 }

crates/rpc/src/proto.rs 🔗

@@ -188,6 +188,8 @@ messages!(
     (PrepareRename, Background),
     (PrepareRenameResponse, Background),
     (ProjectEntryResponse, Foreground),
+    (RejoinRoom, Foreground),
+    (RejoinRoomResponse, Foreground),
     (RemoveContact, Foreground),
     (ReloadBuffers, Foreground),
     (ReloadBuffersResponse, Foreground),
@@ -254,6 +256,7 @@ request_messages!(
     (JoinChannel, JoinChannelResponse),
     (JoinProject, JoinProjectResponse),
     (JoinRoom, JoinRoomResponse),
+    (RejoinRoom, RejoinRoomResponse),
     (IncomingCall, Ack),
     (OpenBufferById, OpenBufferResponse),
     (OpenBufferByPath, OpenBufferResponse),