Register project activity for offline projects as well

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs |  61 ++++++++++++++--
crates/collab/src/rpc.rs               |  59 +++++++++++++--
crates/collab/src/rpc/store.rs         |  60 +++++++++++++--
crates/project/src/project.rs          | 104 ++++++++++++++++++---------
crates/rpc/proto/zed.proto             |   5 +
crates/rpc/src/proto.rs                |   2 
crates/rpc/src/rpc.rs                  |   2 
7 files changed, 227 insertions(+), 66 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -484,14 +484,20 @@ async fn test_offline_projects(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
+    cx_c: &mut TestAppContext,
 ) {
     cx_a.foreground().forbid_parking();
     let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
     let user_a = UserId::from_proto(client_a.user_id().unwrap());
     server
-        .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
+        .make_contacts(vec![
+            (&client_a, cx_a),
+            (&client_b, cx_b),
+            (&client_c, cx_c),
+        ])
         .await;
 
     // Set up observers of the project and user stores. Any time either of
@@ -585,7 +591,8 @@ async fn test_offline_projects(
         .await
         .unwrap();
 
-    // When a project is offline, no information about it is sent to the server.
+    // When a project is offline, we still create it on the server but is invisible
+    // to other users.
     deterministic.run_until_parked();
     assert!(server
         .store
@@ -593,7 +600,10 @@ async fn test_offline_projects(
         .await
         .project_metadata_for_user(user_a)
         .is_empty());
-    assert!(project.read_with(cx_a, |project, _| project.remote_id().is_none()));
+    project.read_with(cx_a, |project, _| {
+        assert!(project.remote_id().is_some());
+        assert!(!project.is_online());
+    });
     assert!(client_b
         .user_store
         .read_with(cx_b, |store, _| { store.contacts()[0].projects.is_empty() }));
@@ -667,7 +677,7 @@ async fn test_offline_projects(
 
     // Build another project using a directory which was previously part of
     // an online project. Restore the project's state from the host's database.
-    let project2 = cx_a.update(|cx| {
+    let project2_a = cx_a.update(|cx| {
         Project::local(
             false,
             client_a.client.clone(),
@@ -678,21 +688,21 @@ async fn test_offline_projects(
             cx,
         )
     });
-    project2
+    project2_a
         .update(cx_a, |p, cx| {
             p.find_or_create_local_worktree("/code/crate3", true, cx)
         })
         .await
         .unwrap();
-    project2
+    project2_a
         .update(cx_a, |project, cx| project.restore_state(cx))
         .await
         .unwrap();
 
     // This project is now online, because its directory was previously online.
-    project2.read_with(cx_a, |project, _| assert!(project.is_online()));
+    project2_a.read_with(cx_a, |project, _| assert!(project.is_online()));
     deterministic.run_until_parked();
-    let project2_id = project2.read_with(cx_a, |p, _| p.remote_id()).unwrap();
+    let project2_id = project2_a.read_with(cx_a, |p, _| p.remote_id()).unwrap();
     client_b.user_store.read_with(cx_b, |store, _| {
         assert_eq!(
             store.contacts()[0].projects,
@@ -715,6 +725,41 @@ async fn test_offline_projects(
         );
     });
 
+    let project2_b = client_b.build_remote_project(&project2_a, cx_a, cx_b).await;
+    let project2_c = cx_c.foreground().spawn(Project::remote(
+        project2_id,
+        client_c.client.clone(),
+        client_c.user_store.clone(),
+        client_c.project_store.clone(),
+        client_c.language_registry.clone(),
+        FakeFs::new(cx_c.background()),
+        cx_c.to_async(),
+    ));
+    deterministic.run_until_parked();
+
+    // Taking a project offline unshares the project, rejects any pending join request and
+    // disconnects existing guests.
+    project2_a.update(cx_a, |project, cx| project.set_online(false, cx));
+    deterministic.run_until_parked();
+    project2_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
+    project2_b.read_with(cx_b, |project, _| assert!(project.is_read_only()));
+    project2_c.await.unwrap_err();
+
+    client_b.user_store.read_with(cx_b, |store, _| {
+        assert_eq!(
+            store.contacts()[0].projects,
+            &[ProjectMetadata {
+                id: project_id,
+                visible_worktree_root_names: vec![
+                    "crate1".into(),
+                    "crate2".into(),
+                    "crate3".into()
+                ],
+                guests: Default::default(),
+            },]
+        );
+    });
+
     cx_a.update(|cx| {
         drop(subscriptions);
         drop(view);

crates/collab/src/rpc.rs 🔗

@@ -605,9 +605,11 @@ impl Server {
             .await
             .user_id_for_connection(request.sender_id)?;
         let project_id = self.app_state.db.register_project(user_id).await?;
-        self.store_mut()
-            .await
-            .register_project(request.sender_id, project_id)?;
+        self.store_mut().await.register_project(
+            request.sender_id,
+            project_id,
+            request.payload.online,
+        )?;
 
         response.send(proto::RegisterProjectResponse {
             project_id: project_id.to_proto(),
@@ -925,12 +927,53 @@ impl Server {
             let guest_connection_ids = state
                 .read_project(project_id, request.sender_id)?
                 .guest_connection_ids();
-            state.update_project(project_id, &request.payload.worktrees, request.sender_id)?;
-            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
-                self.peer
-                    .forward_send(request.sender_id, connection_id, request.payload.clone())
-            });
+            let unshared_project = state.update_project(
+                project_id,
+                &request.payload.worktrees,
+                request.payload.online,
+                request.sender_id,
+            )?;
+
+            if let Some(unshared_project) = unshared_project {
+                broadcast(
+                    request.sender_id,
+                    unshared_project.guests.keys().copied(),
+                    |conn_id| {
+                        self.peer.send(
+                            conn_id,
+                            proto::UnregisterProject {
+                                project_id: project_id.to_proto(),
+                            },
+                        )
+                    },
+                );
+                for (_, receipts) in unshared_project.pending_join_requests {
+                    for receipt in receipts {
+                        self.peer.respond(
+                            receipt,
+                            proto::JoinProjectResponse {
+                                variant: Some(proto::join_project_response::Variant::Decline(
+                                    proto::join_project_response::Decline {
+                                        reason:
+                                            proto::join_project_response::decline::Reason::Closed
+                                                as i32,
+                                    },
+                                )),
+                            },
+                        )?;
+                    }
+                }
+            } else {
+                broadcast(request.sender_id, guest_connection_ids, |connection_id| {
+                    self.peer.forward_send(
+                        request.sender_id,
+                        connection_id,
+                        request.payload.clone(),
+                    )
+                });
+            }
         };
+
         self.update_user_contacts(user_id).await?;
         Ok(())
     }

crates/collab/src/rpc/store.rs 🔗

@@ -32,6 +32,7 @@ struct ConnectionState {
 
 #[derive(Serialize)]
 pub struct Project {
+    pub online: bool,
     pub host_connection_id: ConnectionId,
     pub host: Collaborator,
     pub guests: HashMap<ConnectionId, Collaborator>,
@@ -88,6 +89,11 @@ pub struct LeftProject {
     pub unshare: bool,
 }
 
+pub struct UnsharedProject {
+    pub guests: HashMap<ConnectionId, Collaborator>,
+    pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
+}
+
 #[derive(Copy, Clone)]
 pub struct Metrics {
     pub connections: usize,
@@ -297,7 +303,7 @@ impl Store {
         let mut metadata = Vec::new();
         for project_id in project_ids {
             if let Some(project) = self.projects.get(&project_id) {
-                if project.host.user_id == user_id {
+                if project.host.user_id == user_id && project.online {
                     metadata.push(proto::ProjectMetadata {
                         id: project_id.to_proto(),
                         visible_worktree_root_names: project
@@ -323,6 +329,7 @@ impl Store {
         &mut self,
         host_connection_id: ConnectionId,
         project_id: ProjectId,
+        online: bool,
     ) -> Result<()> {
         let connection = self
             .connections
@@ -332,6 +339,7 @@ impl Store {
         self.projects.insert(
             project_id,
             Project {
+                online,
                 host_connection_id,
                 host: Collaborator {
                     user_id: connection.user_id,
@@ -353,8 +361,9 @@ impl Store {
         &mut self,
         project_id: ProjectId,
         worktrees: &[proto::WorktreeMetadata],
+        online: bool,
         connection_id: ConnectionId,
-    ) -> Result<()> {
+    ) -> Result<Option<UnsharedProject>> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -375,7 +384,34 @@ impl Store {
                     );
                 }
             }
-            Ok(())
+
+            if online != project.online {
+                project.online = online;
+                if project.online {
+                    Ok(None)
+                } else {
+                    for connection_id in project.guest_connection_ids() {
+                        if let Some(connection) = self.connections.get_mut(&connection_id) {
+                            connection.projects.remove(&project_id);
+                        }
+                    }
+
+                    project.active_replica_ids.clear();
+                    project.language_servers.clear();
+                    for worktree in project.worktrees.values_mut() {
+                        worktree.diagnostic_summaries.clear();
+                        worktree.entries.clear();
+                        worktree.extension_counts.clear();
+                    }
+
+                    Ok(Some(UnsharedProject {
+                        guests: mem::take(&mut project.guests),
+                        pending_join_requests: mem::take(&mut project.join_requests),
+                    }))
+                }
+            } else {
+                Ok(None)
+            }
         } else {
             Err(anyhow!("no such project"))?
         }
@@ -481,13 +517,17 @@ impl Store {
             .projects
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
-        connection.requested_projects.insert(project_id);
-        project
-            .join_requests
-            .entry(requester_id)
-            .or_default()
-            .push(receipt);
-        Ok(())
+        if project.online {
+            connection.requested_projects.insert(project_id);
+            project
+                .join_requests
+                .entry(requester_id)
+                .or_default()
+                .push(receipt);
+            Ok(())
+        } else {
+            Err(anyhow!("no such project"))
+        }
     }
 
     pub fn deny_join_project_request(

crates/project/src/project.rs 🔗

@@ -136,13 +136,14 @@ enum ProjectClientState {
         remote_id_rx: watch::Receiver<Option<u64>>,
         online_tx: watch::Sender<bool>,
         online_rx: watch::Receiver<bool>,
-        _maintain_remote_id_task: Task<Option<()>>,
+        _maintain_remote_id: Task<Option<()>>,
+        _maintain_online_status: Task<Option<()>>,
     },
     Remote {
         sharing_has_stopped: bool,
         remote_id: u64,
         replica_id: ReplicaId,
-        _detect_unshare_task: Task<Option<()>>,
+        _detect_unshare: Task<Option<()>>,
     },
 }
 
@@ -381,17 +382,13 @@ impl Project {
         cx: &mut MutableAppContext,
     ) -> ModelHandle<Self> {
         cx.add_model(|cx: &mut ModelContext<Self>| {
-            let (online_tx, online_rx) = watch::channel_with(online);
             let (remote_id_tx, remote_id_rx) = watch::channel();
-            let _maintain_remote_id_task = cx.spawn_weak({
-                let status_rx = client.clone().status();
-                let online_rx = online_rx.clone();
+            let _maintain_remote_id = cx.spawn_weak({
+                let mut status_rx = client.clone().status();
                 move |this, mut cx| async move {
-                    let mut stream = Stream::map(status_rx.clone(), drop)
-                        .merge(Stream::map(online_rx.clone(), drop));
-                    while stream.recv().await.is_some() {
+                    while let Some(status) = status_rx.recv().await {
                         let this = this.upgrade(&cx)?;
-                        if status_rx.borrow().is_connected() && *online_rx.borrow() {
+                        if status.is_connected() {
                             this.update(&mut cx, |this, cx| this.register(cx))
                                 .await
                                 .log_err()?;
@@ -405,6 +402,23 @@ impl Project {
                 }
             });
 
+            let (online_tx, online_rx) = watch::channel_with(online);
+            let _maintain_online_status = cx.spawn_weak({
+                let mut online_rx = online_rx.clone();
+                move |this, mut cx| async move {
+                    while online_rx.recv().await.is_some() {
+                        let this = this.upgrade(&cx)?;
+                        this.update(&mut cx, |this, cx| {
+                            if !this.is_online() {
+                                this.unshared(cx);
+                            }
+                            this.metadata_changed(false, cx)
+                        });
+                    }
+                    None
+                }
+            });
+
             let handle = cx.weak_handle();
             project_store.update(cx, |store, cx| store.add_project(handle, cx));
 
@@ -423,7 +437,8 @@ impl Project {
                     remote_id_rx,
                     online_tx,
                     online_rx,
-                    _maintain_remote_id_task,
+                    _maintain_remote_id,
+                    _maintain_online_status,
                 },
                 opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
                 client_subscriptions: Vec::new(),
@@ -519,7 +534,7 @@ impl Project {
                     sharing_has_stopped: false,
                     remote_id,
                     replica_id,
-                    _detect_unshare_task: cx.spawn_weak(move |this, mut cx| {
+                    _detect_unshare: cx.spawn_weak(move |this, mut cx| {
                         async move {
                             let mut status = client.status();
                             let is_connected =
@@ -850,27 +865,36 @@ impl Project {
     }
 
     fn register(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
-        if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state {
+        if let ProjectClientState::Local {
+            remote_id_rx,
+            online_rx,
+            ..
+        } = &self.client_state
+        {
             if remote_id_rx.borrow().is_some() {
                 return Task::ready(Ok(()));
             }
-        }
 
-        let response = self.client.request(proto::RegisterProject {});
-        cx.spawn(|this, mut cx| async move {
-            let remote_id = response.await?.project_id;
-            this.update(&mut cx, |this, cx| {
-                if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state {
-                    *remote_id_tx.borrow_mut() = Some(remote_id);
-                }
+            let response = self.client.request(proto::RegisterProject {
+                online: *online_rx.borrow(),
+            });
+            cx.spawn(|this, mut cx| async move {
+                let remote_id = response.await?.project_id;
+                this.update(&mut cx, |this, cx| {
+                    if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state {
+                        *remote_id_tx.borrow_mut() = Some(remote_id);
+                    }
 
-                this.metadata_changed(false, cx);
-                cx.emit(Event::RemoteIdChanged(Some(remote_id)));
-                this.client_subscriptions
-                    .push(this.client.add_model_for_remote_entity(remote_id, cx));
-                Ok(())
+                    this.metadata_changed(false, cx);
+                    cx.emit(Event::RemoteIdChanged(Some(remote_id)));
+                    this.client_subscriptions
+                        .push(this.client.add_model_for_remote_entity(remote_id, cx));
+                    Ok(())
+                })
             })
-        })
+        } else {
+            Task::ready(Err(anyhow!("can't register a remote project")))
+        }
     }
 
     pub fn remote_id(&self) -> Option<u64> {
@@ -934,19 +958,25 @@ impl Project {
             ..
         } = &self.client_state
         {
-            if let (Some(project_id), true) = (*remote_id_rx.borrow(), *online_rx.borrow()) {
+            // Broadcast worktrees only if the project is public.
+            let worktrees = if *online_rx.borrow() {
+                self.worktrees
+                    .iter()
+                    .filter_map(|worktree| {
+                        worktree
+                            .upgrade(&cx)
+                            .map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto())
+                    })
+                    .collect()
+            } else {
+                Default::default()
+            };
+            if let Some(project_id) = *remote_id_rx.borrow() {
                 self.client
                     .send(proto::UpdateProject {
                         project_id,
-                        worktrees: self
-                            .worktrees
-                            .iter()
-                            .filter_map(|worktree| {
-                                worktree.upgrade(&cx).map(|worktree| {
-                                    worktree.read(cx).as_local().unwrap().metadata_proto()
-                                })
-                            })
-                            .collect(),
+                        worktrees,
+                        online: *online_rx.borrow(),
                     })
                     .log_err();
             }

crates/rpc/proto/zed.proto 🔗

@@ -121,7 +121,9 @@ message Test {
     uint64 id = 1;
 }
 
-message RegisterProject {}
+message RegisterProject {
+    bool online = 1;
+}
 
 message RegisterProjectResponse {
     uint64 project_id = 1;
@@ -134,6 +136,7 @@ message UnregisterProject {
 message UpdateProject {
     uint64 project_id = 1;
     repeated WorktreeMetadata worktrees = 2;
+    bool online = 3;
 }
 
 message RegisterProjectActivity {

crates/rpc/src/proto.rs 🔗

@@ -130,9 +130,9 @@ messages!(
     (PrepareRename, Background),
     (PrepareRenameResponse, Background),
     (ProjectEntryResponse, Foreground),
+    (ProjectUnshared, Foreground),
     (RegisterProjectResponse, Foreground),
     (Ping, Foreground),
-    (ProjectUnshared, Foreground),
     (RegisterProject, Foreground),
     (RegisterProjectActivity, Foreground),
     (ReloadBuffers, Foreground),

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 26;
+pub const PROTOCOL_VERSION: u32 = 27;