From 70dd586be971d3883641bc6d83fc569005d051f1 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 19 Dec 2022 17:50:43 -0800 Subject: [PATCH] Start work on rejoining rooms, supplying all project info at once Co-authored-by: Nathan Sobo --- crates/call/src/room.rs | 148 ++++++++++++++++++++---------- crates/collab_ui/src/collab_ui.rs | 4 + crates/project/src/project.rs | 103 +++++++++++++-------- crates/rpc/proto/zed.proto | 63 ++++++++----- crates/rpc/src/proto.rs | 3 + 5 files changed, 208 insertions(+), 113 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 1d279717f7307ece877619d33bb60d08693f2c38..14a31095171dd4940bf2e34bdcbc052053cec75e 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -7,7 +7,7 @@ use client::{ proto::{self, PeerId}, Client, TypedEnvelope, User, UserStore, }; -use collections::{BTreeMap, HashSet}; +use collections::{BTreeMap, HashMap, HashSet}; use futures::{FutureExt, StreamExt}; use gpui::{ AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle, @@ -44,6 +44,7 @@ pub struct Room { live_kit: Option, status: RoomStatus, shared_projects: HashSet>, + joined_projects: HashSet>, local_participant: LocalParticipant, remote_participants: BTreeMap, pending_participants: Vec>, @@ -134,6 +135,7 @@ impl Room { live_kit: live_kit_room, status: RoomStatus::Online, shared_projects: Default::default(), + joined_projects: Default::default(), participant_user_ids: Default::default(), local_participant: Default::default(), remote_participants: Default::default(), @@ -259,16 +261,15 @@ impl Room { .next() .await .map_or(false, |s| s.is_connected()); + // Even if we're initially connected, any future change of the status means we momentarily disconnected. if !is_connected || client_status.next().await.is_some() { log::info!("detected client disconnection"); - let room_id = this - .upgrade(&cx) + this.upgrade(&cx) .ok_or_else(|| anyhow!("room was dropped"))? .update(&mut cx, |this, cx| { this.status = RoomStatus::Rejoining; cx.notify(); - this.id }); // Wait for client to re-establish a connection to the server. @@ -281,40 +282,21 @@ impl Room { "waiting for client status change, remaining attempts {}", remaining_attempts ); - if let Some(status) = client_status.next().await { - if status.is_connected() { - log::info!("client reconnected, attempting to rejoin room"); - let rejoin_room = async { - let response = - client.request(proto::JoinRoom { id: room_id }).await?; - let room_proto = - response.room.ok_or_else(|| anyhow!("invalid room"))?; - this.upgrade(&cx) - .ok_or_else(|| anyhow!("room was dropped"))? - .update(&mut cx, |this, cx| { - this.status = RoomStatus::Online; - this.apply_room_update(room_proto, cx)?; - this.shared_projects.retain(|project| { - let Some(project) = project.upgrade(cx) else { return false }; - project.update(cx, |project, cx| { - if let Some(remote_id) = project.remote_id() { - project.shared(remote_id, cx).detach() - } - }); - true - }); - anyhow::Ok(()) - }) - }; - - if rejoin_room.await.log_err().is_some() { - return true; - } else { - remaining_attempts -= 1; - } + let Some(status) = client_status.next().await else { break }; + if status.is_connected() { + log::info!("client reconnected, attempting to rejoin room"); + + let Some(this) = this.upgrade(&cx) else { break }; + if this + .update(&mut cx, |this, cx| this.rejoin(cx)) + .await + .log_err() + .is_some() + { + return true; + } else { + remaining_attempts -= 1; } - } else { - return false; } } false @@ -351,6 +333,73 @@ impl Room { } } + fn rejoin(&mut self, cx: &mut ModelContext) -> Task> { + let mut projects = HashMap::default(); + let mut reshared_projects = Vec::new(); + let mut rejoined_projects = Vec::new(); + self.shared_projects.retain(|project| { + if let Some(handle) = project.upgrade(cx) { + let project = handle.read(cx); + if let Some(project_id) = project.remote_id() { + projects.insert(project_id, handle.clone()); + reshared_projects.push(proto::UpdateProject { + project_id, + worktrees: project.worktree_metadata_protos(cx), + }); + return true; + } + } + false + }); + self.joined_projects.retain(|project| { + if let Some(handle) = project.upgrade(cx) { + let project = handle.read(cx); + if let Some(project_id) = project.remote_id() { + rejoined_projects.push(proto::RejoinProject { + project_id, + worktrees: project + .worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + proto::RejoinWorktree { + id: worktree.id().to_proto(), + scan_id: worktree.scan_id() as u64, + } + }) + .collect(), + }); + } + return true; + } + false + }); + + let response = self.client.request(proto::RejoinRoom { + id: self.id, + reshared_projects, + rejoined_projects, + }); + + cx.spawn(|this, mut cx| async move { + let response = response.await?; + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + this.update(&mut cx, |this, cx| { + this.status = RoomStatus::Online; + this.apply_room_update(room_proto, cx)?; + + for shared_project in response.reshared_projects { + if let Some(project) = projects.get(&shared_project.id) { + project.update(cx, |project, cx| { + project.reshared(shared_project, cx).log_err(); + }); + } + } + + anyhow::Ok(()) + }) + }) + } + pub fn id(&self) -> u64 { self.id } @@ -641,6 +690,17 @@ impl Room { }) } + pub fn joined_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { + self.joined_projects.retain(|project| { + if let Some(project) = project.upgrade(cx) { + !project.read(cx).is_read_only() + } else { + false + } + }); + self.joined_projects.insert(project.downgrade()); + } + pub(crate) fn share_project( &mut self, project: ModelHandle, @@ -652,19 +712,7 @@ impl Room { let request = self.client.request(proto::ShareProject { room_id: self.id(), - worktrees: project - .read(cx) - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - proto::WorktreeMetadata { - id: worktree.id().to_proto(), - root_name: worktree.root_name().into(), - visible: worktree.is_visible(), - abs_path: worktree.abs_path().to_string_lossy().into(), - } - }) - .collect(), + worktrees: project.read(cx).worktree_metadata_protos(cx), }); cx.spawn(|this, mut cx| async move { let response = request.await?; diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index 1041382515bfb398c6f4dbe7ecc674c25e71aa42..4984b84a814f83efa14651a41798697ce7de2d3d 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -68,6 +68,10 @@ pub fn init(app_state: Arc, cx: &mut MutableAppContext) { workspace.update(&mut cx, |workspace, cx| { if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() { + room.update(cx, |room, cx| { + room.joined_project(workspace.project().clone(), cx); + }); + let follow_peer_id = room .read(cx) .remote_participants() diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7f2fcb516f82bd6102e9abd391c406b2b6e34af8..e71971c3d7a561ef684e6c2b30cbc641a2ae03ec 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -13,7 +13,7 @@ use collections::{hash_map, BTreeMap, HashMap, HashSet}; use futures::{ channel::{mpsc, oneshot}, future::Shared, - AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, + select_biased, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; use gpui::{ AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, @@ -151,7 +151,6 @@ enum ProjectClientState { remote_id: u64, metadata_changed: mpsc::UnboundedSender>, _maintain_metadata: Task<()>, - _detect_unshare: Task>, }, Remote { sharing_has_stopped: bool, @@ -552,16 +551,12 @@ impl Project { user_store .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx)) .await?; - let mut collaborators = HashMap::default(); - for message in response.collaborators { - let collaborator = Collaborator::from_proto(message)?; - collaborators.insert(collaborator.peer_id, collaborator); - } - this.update(&mut cx, |this, _| { - this.collaborators = collaborators; + this.update(&mut cx, |this, cx| { + this.set_collaborators_from_proto(response.collaborators, cx)?; this.client_subscriptions.push(subscription); - }); + anyhow::Ok(()) + })?; Ok(this) } @@ -1055,49 +1050,39 @@ impl Project { remote_id: project_id, metadata_changed: metadata_changed_tx, _maintain_metadata: cx.spawn_weak(move |this, cx| async move { - while let Some(tx) = metadata_changed_rx.next().await { - let mut txs = vec![tx]; - while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() { - txs.push(next_tx); + let mut txs = Vec::new(); + loop { + select_biased! { + tx = metadata_changed_rx.next().fuse() => { + let Some(tx) = tx else { break }; + txs.push(tx); + while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() { + txs.push(next_tx); + } + } + status = status.next().fuse() => { + let Some(status) = status else { break }; + if !status.is_connected() { + continue + } + } } let Some(this) = this.upgrade(&cx) else { break }; this.read_with(&cx, |this, cx| { - let worktrees = this - .worktrees - .iter() - .filter_map(|worktree| { - worktree.upgrade(cx).map(|worktree| { - worktree.read(cx).as_local().unwrap().metadata_proto() - }) - }) - .collect(); this.client.request(proto::UpdateProject { project_id, - worktrees, + worktrees: this.worktree_metadata_protos(cx), }) }) .await .log_err(); - for tx in txs { + for tx in txs.drain(..) { let _ = tx.send(()); } } }), - _detect_unshare: cx.spawn_weak(move |this, mut cx| { - async move { - let is_connected = status.next().await.map_or(false, |s| s.is_connected()); - // Even if we're initially connected, any future change of the status means we momentarily disconnected. - if !is_connected || status.next().await.is_some() { - if let Some(this) = this.upgrade(&cx) { - let _ = this.update(&mut cx, |this, cx| this.unshare(cx)); - } - } - Ok(()) - } - .log_err() - }), }); cx.foreground().spawn(async move { @@ -1106,6 +1091,29 @@ impl Project { }) } + pub fn reshared( + &mut self, + message: proto::ResharedProject, + cx: &mut ModelContext, + ) -> Result<()> { + self.set_collaborators_from_proto(message.collaborators, cx)?; + Ok(()) + } + + pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec { + self.worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + proto::WorktreeMetadata { + id: worktree.id().to_proto(), + root_name: worktree.root_name().into(), + visible: worktree.is_visible(), + abs_path: worktree.abs_path().to_string_lossy().into(), + } + }) + .collect() + } + pub fn unshare(&mut self, cx: &mut ModelContext) -> Result<()> { if self.is_remote() { return Err(anyhow!("attempted to unshare a remote project")); @@ -5637,6 +5645,25 @@ impl Project { }) } + fn set_collaborators_from_proto( + &mut self, + messages: Vec, + cx: &mut ModelContext, + ) -> Result<()> { + let mut collaborators = HashMap::default(); + for message in messages { + let collaborator = Collaborator::from_proto(message)?; + collaborators.insert(collaborator.peer_id, collaborator); + } + for old_peer_id in self.collaborators.keys() { + if !collaborators.contains_key(old_peer_id) { + cx.emit(Event::CollaboratorLeft(*old_peer_id)); + } + } + self.collaborators = collaborators; + Ok(()) + } + fn deserialize_symbol( &self, serialized_symbol: proto::Symbol, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 6da9d0a7fc5e9c1ad31e3f6b431f7005b5959fdf..b3322b29232d5938bca08cf1b9f3b48604c60dbe 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -21,6 +21,8 @@ message Envelope { CreateRoomResponse create_room_response = 10; JoinRoom join_room = 11; JoinRoomResponse join_room_response = 12; + RejoinRoom rejoin_room = 108; + RejoinRoomResponse rejoin_room_response = 109; LeaveRoom leave_room = 13; Call call = 14; IncomingCall incoming_call = 15; @@ -161,6 +163,42 @@ message JoinRoomResponse { optional LiveKitConnectionInfo live_kit_connection_info = 2; } +message RejoinRoom { + uint64 id = 1; + repeated UpdateProject reshared_projects = 2; + repeated RejoinProject rejoined_projects = 3; + // relay open buffers and their vector clock +} + +message RejoinProject { + uint64 project_id = 1; + repeated RejoinWorktree worktrees = 2; +} + +message RejoinWorktree { + uint64 id = 1; + uint64 scan_id = 2; +} + +message RejoinRoomResponse { + Room room = 1; + repeated ResharedProject reshared_projects = 2; + repeated RejoinedProject rejoined_projects = 3; +} + +message ResharedProject { + uint64 id = 1; + repeated Collaborator collaborators = 2; +} + +message RejoinedProject { + uint64 id = 1; + uint32 replica_id = 2; + repeated WorktreeMetadata worktrees = 3; + repeated Collaborator collaborators = 4; + repeated LanguageServer language_servers = 5; +} + message LeaveRoom {} message Room { @@ -253,15 +291,6 @@ message ShareProjectResponse { uint64 project_id = 1; } -message ReshareProject { - uint64 id = 1; - repeated WorktreeMetadata worktrees = 2; -} - -message ReshareProjectResponse { - repeated Collaborator collaborators = 1; -} - message UnshareProject { uint64 project_id = 1; } @@ -282,22 +311,6 @@ message JoinProjectResponse { repeated LanguageServer language_servers = 4; } -message RejoinProject { - uint64 project_id = 1; - repeated RejoinWorktree worktrees = 2; -} - -message RejoinWorktree { - uint64 id = 1; - uint64 scan_id = 2; -} - -message RejoinProjectResponse { - repeated WorktreeMetadata worktrees = 1; - repeated Collaborator collaborators = 2; - repeated LanguageServer language_servers = 3; -} - message LeaveProject { uint64 project_id = 1; } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index f833db514dd9918d1b58e6d93292781800529a46..ca70b7dbd9487cc8838a0b2619d4547cbf59b20e 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -188,6 +188,8 @@ messages!( (PrepareRename, Background), (PrepareRenameResponse, Background), (ProjectEntryResponse, Foreground), + (RejoinRoom, Foreground), + (RejoinRoomResponse, Foreground), (RemoveContact, Foreground), (ReloadBuffers, Foreground), (ReloadBuffersResponse, Foreground), @@ -254,6 +256,7 @@ request_messages!( (JoinChannel, JoinChannelResponse), (JoinProject, JoinProjectResponse), (JoinRoom, JoinRoomResponse), + (RejoinRoom, RejoinRoomResponse), (IncomingCall, Ack), (OpenBufferById, OpenBufferResponse), (OpenBufferByPath, OpenBufferResponse),