Track active projects in metrics

Antonio Scandurra created

An active project is defined as a project where there has been at
least a buffer edit, a join request/response, or a follow update
in the last minute.

Change summary

crates/collab/src/rpc.rs       | 61 ++++++++++++++++++++++++-----------
crates/collab/src/rpc/store.rs | 47 ++++++++++++++++++++++++++-
crates/project/src/project.rs  |  4 ++
crates/rpc/proto/zed.proto     |  5 ++
crates/rpc/src/proto.rs        |  2 +
5 files changed, 97 insertions(+), 22 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -61,8 +61,10 @@ pub use store::{Store, Worktree};
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
         register_int_gauge!("connections", "number of connections").unwrap();
-    static ref METRIC_PROJECTS: IntGauge =
-        register_int_gauge!("projects", "number of open projects").unwrap();
+    static ref METRIC_REGISTERED_PROJECTS: IntGauge =
+        register_int_gauge!("registered_projects", "number of registered projects").unwrap();
+    static ref METRIC_ACTIVE_PROJECTS: IntGauge =
+        register_int_gauge!("active_projects", "number of active projects").unwrap();
     static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
         "shared_projects",
         "number of open projects with one or more guests"
@@ -159,6 +161,7 @@ impl Server {
             .add_message_handler(Server::leave_project)
             .add_message_handler(Server::respond_to_join_project_request)
             .add_message_handler(Server::update_project)
+            .add_message_handler(Server::register_project_activity)
             .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::start_language_server)
             .add_message_handler(Server::update_language_server)
@@ -844,6 +847,16 @@ impl Server {
         Ok(())
     }
 
+    async fn register_project_activity(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::RegisterProjectActivity>,
+    ) -> Result<()> {
+        self.store_mut()
+            .await
+            .register_project_activity(request.payload.project_id, request.sender_id)?;
+        Ok(())
+    }
+
     async fn update_worktree(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,
@@ -1001,10 +1014,12 @@ impl Server {
         request: TypedEnvelope<proto::UpdateBuffer>,
         response: Response<proto::UpdateBuffer>,
     ) -> Result<()> {
-        let receiver_ids = self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        let receiver_ids = {
+            let mut store = self.store_mut().await;
+            store.register_project_activity(request.payload.project_id, request.sender_id)?;
+            store.project_connection_ids(request.payload.project_id, request.sender_id)?
+        };
+
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
@@ -1065,14 +1080,18 @@ impl Server {
     ) -> Result<()> {
         let leader_id = ConnectionId(request.payload.leader_id);
         let follower_id = request.sender_id;
-        if !self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, follower_id)?
-            .contains(&leader_id)
         {
-            Err(anyhow!("no such peer"))?;
+            let mut store = self.store_mut().await;
+            if store
+                .project_connection_ids(request.payload.project_id, follower_id)?
+                .contains(&leader_id)
+            {
+                Err(anyhow!("no such peer"))?;
+            }
+
+            store.register_project_activity(request.payload.project_id, follower_id)?;
         }
+
         let mut response_payload = self
             .peer
             .forward_request(request.sender_id, leader_id, request.payload)
@@ -1086,14 +1105,14 @@ impl Server {
 
     async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
         let leader_id = ConnectionId(request.payload.leader_id);
-        if !self
-            .store()
-            .await
+        let mut store = self.store_mut().await;
+        if !store
             .project_connection_ids(request.payload.project_id, request.sender_id)?
             .contains(&leader_id)
         {
             Err(anyhow!("no such peer"))?;
         }
+        store.register_project_activity(request.payload.project_id, request.sender_id)?;
         self.peer
             .forward_send(request.sender_id, leader_id, request.payload)?;
         Ok(())
@@ -1103,10 +1122,10 @@ impl Server {
         self: Arc<Self>,
         request: TypedEnvelope<proto::UpdateFollowers>,
     ) -> Result<()> {
-        let connection_ids = self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        let mut store = self.store_mut().await;
+        store.register_project_activity(request.payload.project_id, request.sender_id)?;
+        let connection_ids =
+            store.project_connection_ids(request.payload.project_id, request.sender_id)?;
         let leader_id = request
             .payload
             .variant
@@ -1574,12 +1593,14 @@ impl<'a> Drop for StoreWriteGuard<'a> {
         let metrics = self.metrics();
 
         METRIC_CONNECTIONS.set(metrics.connections as _);
-        METRIC_PROJECTS.set(metrics.registered_projects as _);
+        METRIC_REGISTERED_PROJECTS.set(metrics.registered_projects as _);
+        METRIC_ACTIVE_PROJECTS.set(metrics.active_projects as _);
         METRIC_SHARED_PROJECTS.set(metrics.shared_projects as _);
 
         tracing::info!(
             connections = metrics.connections,
             registered_projects = metrics.registered_projects,
+            active_projects = metrics.active_projects,
             shared_projects = metrics.shared_projects,
             "metrics"
         );

crates/collab/src/rpc/store.rs 🔗

@@ -9,6 +9,7 @@ use std::{
     mem,
     path::{Path, PathBuf},
     str,
+    time::{Duration, Instant},
 };
 use tracing::instrument;
 
@@ -41,6 +42,8 @@ pub struct Project {
     pub active_replica_ids: HashSet<ReplicaId>,
     pub worktrees: BTreeMap<u64, Worktree>,
     pub language_servers: Vec<proto::LanguageServer>,
+    #[serde(skip)]
+    last_activity: Option<Instant>,
 }
 
 #[derive(Default, Serialize)]
@@ -84,6 +87,7 @@ pub struct LeftProject {
 pub struct Metrics {
     pub connections: usize,
     pub registered_projects: usize,
+    pub active_projects: usize,
     pub shared_projects: usize,
 }
 
@@ -91,13 +95,17 @@ impl Store {
     pub fn metrics(&self) -> Metrics {
         let connections = self.connections.values().filter(|c| !c.admin).count();
         let mut registered_projects = 0;
+        let mut active_projects = 0;
         let mut shared_projects = 0;
         for project in self.projects.values() {
             if let Some(connection) = self.connections.get(&project.host_connection_id) {
                 if !connection.admin {
                     registered_projects += 1;
-                    if !project.guests.is_empty() {
-                        shared_projects += 1;
+                    if project.is_active() {
+                        active_projects += 1;
+                        if !project.guests.is_empty() {
+                            shared_projects += 1;
+                        }
                     }
                 }
             }
@@ -106,6 +114,7 @@ impl Store {
         Metrics {
             connections,
             registered_projects,
+            active_projects,
             shared_projects,
         }
     }
@@ -318,6 +327,7 @@ impl Store {
                 active_replica_ids: Default::default(),
                 worktrees: Default::default(),
                 language_servers: Default::default(),
+                last_activity: None,
             },
         );
         if let Some(connection) = self.connections.get_mut(&host_connection_id) {
@@ -338,6 +348,7 @@ impl Store {
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
         if project.host_connection_id == connection_id {
+            project.last_activity = Some(Instant::now());
             let mut old_worktrees = mem::take(&mut project.worktrees);
             for worktree in worktrees {
                 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
@@ -460,6 +471,7 @@ impl Store {
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
         connection.requested_projects.insert(project_id);
+        project.last_activity = Some(Instant::now());
         project
             .join_requests
             .entry(requester_id)
@@ -484,6 +496,8 @@ impl Store {
             let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
             requester_connection.requested_projects.remove(&project_id);
         }
+        project.last_activity = Some(Instant::now());
+
         Some(receipts)
     }
 
@@ -515,6 +529,7 @@ impl Store {
             receipts_with_replica_ids.push((receipt, replica_id));
         }
 
+        project.last_activity = Some(Instant::now());
         Some((receipts_with_replica_ids, project))
     }
 
@@ -565,6 +580,8 @@ impl Store {
             }
         }
 
+        project.last_activity = Some(Instant::now());
+
         Ok(LeftProject {
             host_connection_id: project.host_connection_id,
             host_user_id: project.host_user_id,
@@ -653,6 +670,25 @@ impl Store {
             .ok_or_else(|| anyhow!("no such project"))
     }
 
+    pub fn register_project_activity(
+        &mut self,
+        project_id: u64,
+        connection_id: ConnectionId,
+    ) -> Result<()> {
+        let project = self
+            .projects
+            .get_mut(&project_id)
+            .ok_or_else(|| anyhow!("no such project"))?;
+        if project.host_connection_id == connection_id
+            || project.guests.contains_key(&connection_id)
+        {
+            project.last_activity = Some(Instant::now());
+            Ok(())
+        } else {
+            Err(anyhow!("no such project"))?
+        }
+    }
+
     pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> {
         let project = self
             .projects
@@ -758,6 +794,13 @@ impl Store {
 }
 
 impl Project {
+    fn is_active(&self) -> bool {
+        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
+        self.last_activity.map_or(false, |last_activity| {
+            last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT
+        })
+    }
+
     pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
         self.guests.keys().copied().collect()
     }

crates/project/src/project.rs 🔗

@@ -1780,6 +1780,10 @@ impl Project {
                         operations: vec![language::proto::serialize_operation(&operation)],
                     });
                     cx.background().spawn(request).detach_and_log_err(cx);
+                } else if let Some(project_id) = self.remote_id() {
+                    let _ = self
+                        .client
+                        .send(proto::RegisterProjectActivity { project_id });
                 }
             }
             BufferEvent::Edited { .. } => {

crates/rpc/proto/zed.proto 🔗

@@ -36,6 +36,7 @@ message Envelope {
         OpenBufferForSymbolResponse open_buffer_for_symbol_response = 29;
 
         UpdateProject update_project = 30;
+        RegisterProjectActivity register_project_activity = 31;
         UpdateWorktree update_worktree = 32;
 
         CreateProjectEntry create_project_entry = 33;
@@ -135,6 +136,10 @@ message UpdateProject {
     repeated WorktreeMetadata worktrees = 2;
 }
 
+message RegisterProjectActivity {
+    uint64 project_id = 1;
+}
+
 message RequestJoinProject {
     uint64 requester_id = 1;
     uint64 project_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -134,6 +134,7 @@ messages!(
     (Ping, Foreground),
     (ProjectUnshared, Foreground),
     (RegisterProject, Foreground),
+    (RegisterProjectActivity, Foreground),
     (ReloadBuffers, Foreground),
     (ReloadBuffersResponse, Foreground),
     (RemoveProjectCollaborator, Foreground),
@@ -236,6 +237,7 @@ entity_messages!(
     PerformRename,
     PrepareRename,
     ProjectUnshared,
+    RegisterProjectActivity,
     ReloadBuffers,
     RemoveProjectCollaborator,
     RenameProjectEntry,