From e3cfc7b3ce9125e6ce601962b10fc2e131635a83 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 14:55:04 +0200 Subject: [PATCH] Register project activity for offline projects as well --- crates/collab/src/integration_tests.rs | 61 +++++++++++++-- crates/collab/src/rpc.rs | 59 ++++++++++++-- crates/collab/src/rpc/store.rs | 60 +++++++++++--- crates/project/src/project.rs | 104 ++++++++++++++++--------- crates/rpc/proto/zed.proto | 5 +- crates/rpc/src/proto.rs | 2 +- crates/rpc/src/rpc.rs | 2 +- 7 files changed, 227 insertions(+), 66 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index facef17b63b52192f668fa1aaee0aeb777e7a665..89b1f3878227500282b3a7680a5d2689db8709bd 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -484,14 +484,20 @@ async fn test_offline_projects( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, ) { cx_a.foreground().forbid_parking(); let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; let user_a = UserId::from_proto(client_a.user_id().unwrap()); server - .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)]) + .make_contacts(vec![ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + ]) .await; // Set up observers of the project and user stores. Any time either of @@ -585,7 +591,8 @@ async fn test_offline_projects( .await .unwrap(); - // When a project is offline, no information about it is sent to the server. + // When a project is offline, we still create it on the server but is invisible + // to other users. deterministic.run_until_parked(); assert!(server .store @@ -593,7 +600,10 @@ async fn test_offline_projects( .await .project_metadata_for_user(user_a) .is_empty()); - assert!(project.read_with(cx_a, |project, _| project.remote_id().is_none())); + project.read_with(cx_a, |project, _| { + assert!(project.remote_id().is_some()); + assert!(!project.is_online()); + }); assert!(client_b .user_store .read_with(cx_b, |store, _| { store.contacts()[0].projects.is_empty() })); @@ -667,7 +677,7 @@ async fn test_offline_projects( // Build another project using a directory which was previously part of // an online project. Restore the project's state from the host's database. - let project2 = cx_a.update(|cx| { + let project2_a = cx_a.update(|cx| { Project::local( false, client_a.client.clone(), @@ -678,21 +688,21 @@ async fn test_offline_projects( cx, ) }); - project2 + project2_a .update(cx_a, |p, cx| { p.find_or_create_local_worktree("/code/crate3", true, cx) }) .await .unwrap(); - project2 + project2_a .update(cx_a, |project, cx| project.restore_state(cx)) .await .unwrap(); // This project is now online, because its directory was previously online. - project2.read_with(cx_a, |project, _| assert!(project.is_online())); + project2_a.read_with(cx_a, |project, _| assert!(project.is_online())); deterministic.run_until_parked(); - let project2_id = project2.read_with(cx_a, |p, _| p.remote_id()).unwrap(); + let project2_id = project2_a.read_with(cx_a, |p, _| p.remote_id()).unwrap(); client_b.user_store.read_with(cx_b, |store, _| { assert_eq!( store.contacts()[0].projects, @@ -715,6 +725,41 @@ async fn test_offline_projects( ); }); + let project2_b = client_b.build_remote_project(&project2_a, cx_a, cx_b).await; + let project2_c = cx_c.foreground().spawn(Project::remote( + project2_id, + client_c.client.clone(), + client_c.user_store.clone(), + client_c.project_store.clone(), + client_c.language_registry.clone(), + FakeFs::new(cx_c.background()), + cx_c.to_async(), + )); + deterministic.run_until_parked(); + + // Taking a project offline unshares the project, rejects any pending join request and + // disconnects existing guests. + project2_a.update(cx_a, |project, cx| project.set_online(false, cx)); + deterministic.run_until_parked(); + project2_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); + project2_b.read_with(cx_b, |project, _| assert!(project.is_read_only())); + project2_c.await.unwrap_err(); + + client_b.user_store.read_with(cx_b, |store, _| { + assert_eq!( + store.contacts()[0].projects, + &[ProjectMetadata { + id: project_id, + visible_worktree_root_names: vec![ + "crate1".into(), + "crate2".into(), + "crate3".into() + ], + guests: Default::default(), + },] + ); + }); + cx_a.update(|cx| { drop(subscriptions); drop(view); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 00252f4d6b0cecac76036c6c5f0e56d1f0e73d01..1aa2e5c4af4105edf797e4067ae3bd9a9221830a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -605,9 +605,11 @@ impl Server { .await .user_id_for_connection(request.sender_id)?; let project_id = self.app_state.db.register_project(user_id).await?; - self.store_mut() - .await - .register_project(request.sender_id, project_id)?; + self.store_mut().await.register_project( + request.sender_id, + project_id, + request.payload.online, + )?; response.send(proto::RegisterProjectResponse { project_id: project_id.to_proto(), @@ -925,12 +927,53 @@ impl Server { let guest_connection_ids = state .read_project(project_id, request.sender_id)? .guest_connection_ids(); - state.update_project(project_id, &request.payload.worktrees, request.sender_id)?; - broadcast(request.sender_id, guest_connection_ids, |connection_id| { - self.peer - .forward_send(request.sender_id, connection_id, request.payload.clone()) - }); + let unshared_project = state.update_project( + project_id, + &request.payload.worktrees, + request.payload.online, + request.sender_id, + )?; + + if let Some(unshared_project) = unshared_project { + broadcast( + request.sender_id, + unshared_project.guests.keys().copied(), + |conn_id| { + self.peer.send( + conn_id, + proto::UnregisterProject { + project_id: project_id.to_proto(), + }, + ) + }, + ); + for (_, receipts) in unshared_project.pending_join_requests { + for receipt in receipts { + self.peer.respond( + receipt, + proto::JoinProjectResponse { + variant: Some(proto::join_project_response::Variant::Decline( + proto::join_project_response::Decline { + reason: + proto::join_project_response::decline::Reason::Closed + as i32, + }, + )), + }, + )?; + } + } + } else { + broadcast(request.sender_id, guest_connection_ids, |connection_id| { + self.peer.forward_send( + request.sender_id, + connection_id, + request.payload.clone(), + ) + }); + } }; + self.update_user_contacts(user_id).await?; Ok(()) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d1eb4a3be6d50a20c993e7b9cadacfd5de7e459d..2e69a97b27f260cb9730add613d957246a9d900c 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -32,6 +32,7 @@ struct ConnectionState { #[derive(Serialize)] pub struct Project { + pub online: bool, pub host_connection_id: ConnectionId, pub host: Collaborator, pub guests: HashMap, @@ -88,6 +89,11 @@ pub struct LeftProject { pub unshare: bool, } +pub struct UnsharedProject { + pub guests: HashMap, + pub pending_join_requests: HashMap>>, +} + #[derive(Copy, Clone)] pub struct Metrics { pub connections: usize, @@ -297,7 +303,7 @@ impl Store { let mut metadata = Vec::new(); for project_id in project_ids { if let Some(project) = self.projects.get(&project_id) { - if project.host.user_id == user_id { + if project.host.user_id == user_id && project.online { metadata.push(proto::ProjectMetadata { id: project_id.to_proto(), visible_worktree_root_names: project @@ -323,6 +329,7 @@ impl Store { &mut self, host_connection_id: ConnectionId, project_id: ProjectId, + online: bool, ) -> Result<()> { let connection = self .connections @@ -332,6 +339,7 @@ impl Store { self.projects.insert( project_id, Project { + online, host_connection_id, host: Collaborator { user_id: connection.user_id, @@ -353,8 +361,9 @@ impl Store { &mut self, project_id: ProjectId, worktrees: &[proto::WorktreeMetadata], + online: bool, connection_id: ConnectionId, - ) -> Result<()> { + ) -> Result> { let project = self .projects .get_mut(&project_id) @@ -375,7 +384,34 @@ impl Store { ); } } - Ok(()) + + if online != project.online { + project.online = online; + if project.online { + Ok(None) + } else { + for connection_id in project.guest_connection_ids() { + if let Some(connection) = self.connections.get_mut(&connection_id) { + connection.projects.remove(&project_id); + } + } + + project.active_replica_ids.clear(); + project.language_servers.clear(); + for worktree in project.worktrees.values_mut() { + worktree.diagnostic_summaries.clear(); + worktree.entries.clear(); + worktree.extension_counts.clear(); + } + + Ok(Some(UnsharedProject { + guests: mem::take(&mut project.guests), + pending_join_requests: mem::take(&mut project.join_requests), + })) + } + } else { + Ok(None) + } } else { Err(anyhow!("no such project"))? } @@ -481,13 +517,17 @@ impl Store { .projects .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; - connection.requested_projects.insert(project_id); - project - .join_requests - .entry(requester_id) - .or_default() - .push(receipt); - Ok(()) + if project.online { + connection.requested_projects.insert(project_id); + project + .join_requests + .entry(requester_id) + .or_default() + .push(receipt); + Ok(()) + } else { + Err(anyhow!("no such project")) + } } pub fn deny_join_project_request( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ce07a6abdf2a654008dc218826ea2b4a64e7896..4e3f78333091723cb8631a1aa1713ed238880df6 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -136,13 +136,14 @@ enum ProjectClientState { remote_id_rx: watch::Receiver>, online_tx: watch::Sender, online_rx: watch::Receiver, - _maintain_remote_id_task: Task>, + _maintain_remote_id: Task>, + _maintain_online_status: Task>, }, Remote { sharing_has_stopped: bool, remote_id: u64, replica_id: ReplicaId, - _detect_unshare_task: Task>, + _detect_unshare: Task>, }, } @@ -381,17 +382,13 @@ impl Project { cx: &mut MutableAppContext, ) -> ModelHandle { cx.add_model(|cx: &mut ModelContext| { - let (online_tx, online_rx) = watch::channel_with(online); let (remote_id_tx, remote_id_rx) = watch::channel(); - let _maintain_remote_id_task = cx.spawn_weak({ - let status_rx = client.clone().status(); - let online_rx = online_rx.clone(); + let _maintain_remote_id = cx.spawn_weak({ + let mut status_rx = client.clone().status(); move |this, mut cx| async move { - let mut stream = Stream::map(status_rx.clone(), drop) - .merge(Stream::map(online_rx.clone(), drop)); - while stream.recv().await.is_some() { + while let Some(status) = status_rx.recv().await { let this = this.upgrade(&cx)?; - if status_rx.borrow().is_connected() && *online_rx.borrow() { + if status.is_connected() { this.update(&mut cx, |this, cx| this.register(cx)) .await .log_err()?; @@ -405,6 +402,23 @@ impl Project { } }); + let (online_tx, online_rx) = watch::channel_with(online); + let _maintain_online_status = cx.spawn_weak({ + let mut online_rx = online_rx.clone(); + move |this, mut cx| async move { + while online_rx.recv().await.is_some() { + let this = this.upgrade(&cx)?; + this.update(&mut cx, |this, cx| { + if !this.is_online() { + this.unshared(cx); + } + this.metadata_changed(false, cx) + }); + } + None + } + }); + let handle = cx.weak_handle(); project_store.update(cx, |store, cx| store.add_project(handle, cx)); @@ -423,7 +437,8 @@ impl Project { remote_id_rx, online_tx, online_rx, - _maintain_remote_id_task, + _maintain_remote_id, + _maintain_online_status, }, opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), client_subscriptions: Vec::new(), @@ -519,7 +534,7 @@ impl Project { sharing_has_stopped: false, remote_id, replica_id, - _detect_unshare_task: cx.spawn_weak(move |this, mut cx| { + _detect_unshare: cx.spawn_weak(move |this, mut cx| { async move { let mut status = client.status(); let is_connected = @@ -850,27 +865,36 @@ impl Project { } fn register(&mut self, cx: &mut ModelContext) -> Task> { - if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state { + if let ProjectClientState::Local { + remote_id_rx, + online_rx, + .. + } = &self.client_state + { if remote_id_rx.borrow().is_some() { return Task::ready(Ok(())); } - } - let response = self.client.request(proto::RegisterProject {}); - cx.spawn(|this, mut cx| async move { - let remote_id = response.await?.project_id; - this.update(&mut cx, |this, cx| { - if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state { - *remote_id_tx.borrow_mut() = Some(remote_id); - } + let response = self.client.request(proto::RegisterProject { + online: *online_rx.borrow(), + }); + cx.spawn(|this, mut cx| async move { + let remote_id = response.await?.project_id; + this.update(&mut cx, |this, cx| { + if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state { + *remote_id_tx.borrow_mut() = Some(remote_id); + } - this.metadata_changed(false, cx); - cx.emit(Event::RemoteIdChanged(Some(remote_id))); - this.client_subscriptions - .push(this.client.add_model_for_remote_entity(remote_id, cx)); - Ok(()) + this.metadata_changed(false, cx); + cx.emit(Event::RemoteIdChanged(Some(remote_id))); + this.client_subscriptions + .push(this.client.add_model_for_remote_entity(remote_id, cx)); + Ok(()) + }) }) - }) + } else { + Task::ready(Err(anyhow!("can't register a remote project"))) + } } pub fn remote_id(&self) -> Option { @@ -934,19 +958,25 @@ impl Project { .. } = &self.client_state { - if let (Some(project_id), true) = (*remote_id_rx.borrow(), *online_rx.borrow()) { + // Broadcast worktrees only if the project is public. + let worktrees = if *online_rx.borrow() { + self.worktrees + .iter() + .filter_map(|worktree| { + worktree + .upgrade(&cx) + .map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto()) + }) + .collect() + } else { + Default::default() + }; + if let Some(project_id) = *remote_id_rx.borrow() { self.client .send(proto::UpdateProject { project_id, - worktrees: self - .worktrees - .iter() - .filter_map(|worktree| { - worktree.upgrade(&cx).map(|worktree| { - worktree.read(cx).as_local().unwrap().metadata_proto() - }) - }) - .collect(), + worktrees, + online: *online_rx.borrow(), }) .log_err(); } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 69ccae1704ebb05b87dad60a5bd32c9f00432fa3..1cfbb6cea4448aedc7e40f14d806902a56c463ba 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -121,7 +121,9 @@ message Test { uint64 id = 1; } -message RegisterProject {} +message RegisterProject { + bool online = 1; +} message RegisterProjectResponse { uint64 project_id = 1; @@ -134,6 +136,7 @@ message UnregisterProject { message UpdateProject { uint64 project_id = 1; repeated WorktreeMetadata worktrees = 2; + bool online = 3; } message RegisterProjectActivity { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index ecee3709863b4239dcd43b3c2e9c21eb4868880c..8b7c5e302bba2021e2a06473decb063d9bb8bfbd 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -130,9 +130,9 @@ messages!( (PrepareRename, Background), (PrepareRenameResponse, Background), (ProjectEntryResponse, Foreground), + (ProjectUnshared, Foreground), (RegisterProjectResponse, Foreground), (Ping, Foreground), - (ProjectUnshared, Foreground), (RegisterProject, Foreground), (RegisterProjectActivity, Foreground), (ReloadBuffers, Foreground), diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index d7d40e81e809040453aef53ea01c9d9554bb71dc..5fff19bb065b95b2da5eb3be25f579fcd41df1fa 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 26; +pub const PROTOCOL_VERSION: u32 = 27;