Merge pull request #1189 from zed-industries/improve-metrics

Antonio Scandurra created

Exclude staff from metrics and start tracking active projects

Change summary

crates/collab/src/rpc.rs       | 72 ++++++++++++++++++++++++------------
crates/collab/src/rpc/store.rs | 48 +++++++++++++++++++++--
crates/project/src/project.rs  |  4 ++
crates/rpc/proto/zed.proto     |  5 ++
crates/rpc/src/proto.rs        |  2 +
5 files changed, 102 insertions(+), 29 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -61,8 +61,10 @@ pub use store::{Store, Worktree};
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
         register_int_gauge!("connections", "number of connections").unwrap();
-    static ref METRIC_PROJECTS: IntGauge =
-        register_int_gauge!("projects", "number of open projects").unwrap();
+    static ref METRIC_REGISTERED_PROJECTS: IntGauge =
+        register_int_gauge!("registered_projects", "number of registered projects").unwrap();
+    static ref METRIC_ACTIVE_PROJECTS: IntGauge =
+        register_int_gauge!("active_projects", "number of active projects").unwrap();
     static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
         "shared_projects",
         "number of open projects with one or more guests"
@@ -159,6 +161,7 @@ impl Server {
             .add_message_handler(Server::leave_project)
             .add_message_handler(Server::respond_to_join_project_request)
             .add_message_handler(Server::update_project)
+            .add_message_handler(Server::register_project_activity)
             .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::start_language_server)
             .add_message_handler(Server::update_language_server)
@@ -329,7 +332,7 @@ impl Server {
 
             {
                 let mut store = this.store_mut().await;
-                store.add_connection(connection_id, user_id);
+                store.add_connection(connection_id, user_id, user.admin);
                 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 
                 if let Some((code, count)) = invite_code {
@@ -844,6 +847,16 @@ impl Server {
         Ok(())
     }
 
+    async fn register_project_activity(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::RegisterProjectActivity>,
+    ) -> Result<()> {
+        self.store_mut()
+            .await
+            .register_project_activity(request.payload.project_id, request.sender_id)?;
+        Ok(())
+    }
+
     async fn update_worktree(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,
@@ -1001,10 +1014,12 @@ impl Server {
         request: TypedEnvelope<proto::UpdateBuffer>,
         response: Response<proto::UpdateBuffer>,
     ) -> Result<()> {
-        let receiver_ids = self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        let receiver_ids = {
+            let mut store = self.store_mut().await;
+            store.register_project_activity(request.payload.project_id, request.sender_id)?;
+            store.project_connection_ids(request.payload.project_id, request.sender_id)?
+        };
+
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
@@ -1065,14 +1080,18 @@ impl Server {
     ) -> Result<()> {
         let leader_id = ConnectionId(request.payload.leader_id);
         let follower_id = request.sender_id;
-        if !self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, follower_id)?
-            .contains(&leader_id)
         {
-            Err(anyhow!("no such peer"))?;
+            let mut store = self.store_mut().await;
+            if !store
+                .project_connection_ids(request.payload.project_id, follower_id)?
+                .contains(&leader_id)
+            {
+                Err(anyhow!("no such peer"))?;
+            }
+
+            store.register_project_activity(request.payload.project_id, follower_id)?;
         }
+
         let mut response_payload = self
             .peer
             .forward_request(request.sender_id, leader_id, request.payload)
@@ -1086,14 +1105,14 @@ impl Server {
 
     async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
         let leader_id = ConnectionId(request.payload.leader_id);
-        if !self
-            .store()
-            .await
+        let mut store = self.store_mut().await;
+        if !store
             .project_connection_ids(request.payload.project_id, request.sender_id)?
             .contains(&leader_id)
         {
             Err(anyhow!("no such peer"))?;
         }
+        store.register_project_activity(request.payload.project_id, request.sender_id)?;
         self.peer
             .forward_send(request.sender_id, leader_id, request.payload)?;
         Ok(())
@@ -1103,10 +1122,10 @@ impl Server {
         self: Arc<Self>,
         request: TypedEnvelope<proto::UpdateFollowers>,
     ) -> Result<()> {
-        let connection_ids = self
-            .store()
-            .await
-            .project_connection_ids(request.payload.project_id, request.sender_id)?;
+        let mut store = self.store_mut().await;
+        store.register_project_activity(request.payload.project_id, request.sender_id)?;
+        let connection_ids =
+            store.project_connection_ids(request.payload.project_id, request.sender_id)?;
         let leader_id = request
             .payload
             .variant
@@ -1574,12 +1593,14 @@ impl<'a> Drop for StoreWriteGuard<'a> {
         let metrics = self.metrics();
 
         METRIC_CONNECTIONS.set(metrics.connections as _);
-        METRIC_PROJECTS.set(metrics.registered_projects as _);
+        METRIC_REGISTERED_PROJECTS.set(metrics.registered_projects as _);
+        METRIC_ACTIVE_PROJECTS.set(metrics.active_projects as _);
         METRIC_SHARED_PROJECTS.set(metrics.shared_projects as _);
 
         tracing::info!(
             connections = metrics.connections,
             registered_projects = metrics.registered_projects,
+            active_projects = metrics.active_projects,
             shared_projects = metrics.shared_projects,
             "metrics"
         );
@@ -1649,10 +1670,10 @@ pub fn routes(server: Arc<Server>) -> Router<Body> {
         .layer(
             ServiceBuilder::new()
                 .layer(Extension(server.app_state.clone()))
-                .layer(middleware::from_fn(auth::validate_header))
-                .layer(Extension(server)),
+                .layer(middleware::from_fn(auth::validate_header)),
         )
         .route("/metrics", get(handle_metrics))
+        .layer(Extension(server))
 }
 
 pub async fn handle_websocket_request(
@@ -1686,7 +1707,10 @@ pub async fn handle_websocket_request(
     })
 }
 
-pub async fn handle_metrics() -> axum::response::Response {
+pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> axum::response::Response {
+    // We call `store_mut` here for its side effects of updating metrics.
+    server.store_mut().await;
+
     let encoder = prometheus::TextEncoder::new();
     let metric_families = prometheus::gather();
     match encoder.encode_to_string(&metric_families) {

crates/collab/src/rpc/store.rs 🔗

@@ -9,6 +9,7 @@ use std::{
     mem,
     path::{Path, PathBuf},
     str,
+    time::{Duration, Instant},
 };
 use tracing::instrument;
 
@@ -25,6 +26,7 @@ pub struct Store {
 #[derive(Serialize)]
 struct ConnectionState {
     user_id: UserId,
+    admin: bool,
     projects: HashSet<u64>,
     requested_projects: HashSet<u64>,
     channels: HashSet<ChannelId>,
@@ -40,6 +42,8 @@ pub struct Project {
     pub active_replica_ids: HashSet<ReplicaId>,
     pub worktrees: BTreeMap<u64, Worktree>,
     pub language_servers: Vec<proto::LanguageServer>,
+    #[serde(skip)]
+    last_activity: Option<Instant>,
 }
 
 #[derive(Default, Serialize)]
@@ -83,34 +87,45 @@ pub struct LeftProject {
 pub struct Metrics {
     pub connections: usize,
     pub registered_projects: usize,
+    pub active_projects: usize,
     pub shared_projects: usize,
 }
 
 impl Store {
     pub fn metrics(&self) -> Metrics {
-        let connections = self.connections.len();
+        let connections = self.connections.values().filter(|c| !c.admin).count();
         let mut registered_projects = 0;
+        let mut active_projects = 0;
         let mut shared_projects = 0;
         for project in self.projects.values() {
-            registered_projects += 1;
-            if !project.guests.is_empty() {
-                shared_projects += 1;
+            if let Some(connection) = self.connections.get(&project.host_connection_id) {
+                if !connection.admin {
+                    registered_projects += 1;
+                    if project.is_active() {
+                        active_projects += 1;
+                        if !project.guests.is_empty() {
+                            shared_projects += 1;
+                        }
+                    }
+                }
             }
         }
 
         Metrics {
             connections,
             registered_projects,
+            active_projects,
             shared_projects,
         }
     }
 
     #[instrument(skip(self))]
-    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
+    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
         self.connections.insert(
             connection_id,
             ConnectionState {
                 user_id,
+                admin,
                 projects: Default::default(),
                 requested_projects: Default::default(),
                 channels: Default::default(),
@@ -312,6 +327,7 @@ impl Store {
                 active_replica_ids: Default::default(),
                 worktrees: Default::default(),
                 language_servers: Default::default(),
+                last_activity: None,
             },
         );
         if let Some(connection) = self.connections.get_mut(&host_connection_id) {
@@ -454,6 +470,7 @@ impl Store {
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
         connection.requested_projects.insert(project_id);
+        project.last_activity = Some(Instant::now());
         project
             .join_requests
             .entry(requester_id)
@@ -478,6 +495,8 @@ impl Store {
             let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
             requester_connection.requested_projects.remove(&project_id);
         }
+        project.last_activity = Some(Instant::now());
+
         Some(receipts)
     }
 
@@ -509,6 +528,7 @@ impl Store {
             receipts_with_replica_ids.push((receipt, replica_id));
         }
 
+        project.last_activity = Some(Instant::now());
         Some((receipts_with_replica_ids, project))
     }
 
@@ -559,6 +579,8 @@ impl Store {
             }
         }
 
+        project.last_activity = Some(Instant::now());
+
         Ok(LeftProject {
             host_connection_id: project.host_connection_id,
             host_user_id: project.host_user_id,
@@ -647,6 +669,15 @@ impl Store {
             .ok_or_else(|| anyhow!("no such project"))
     }
 
+    pub fn register_project_activity(
+        &mut self,
+        project_id: u64,
+        connection_id: ConnectionId,
+    ) -> Result<()> {
+        self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now());
+        Ok(())
+    }
+
     pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> {
         let project = self
             .projects
@@ -752,6 +783,13 @@ impl Store {
 }
 
 impl Project {
+    fn is_active(&self) -> bool {
+        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
+        self.last_activity.map_or(false, |last_activity| {
+            last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT
+        })
+    }
+
     pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
         self.guests.keys().copied().collect()
     }

crates/project/src/project.rs 🔗

@@ -1780,6 +1780,10 @@ impl Project {
                         operations: vec![language::proto::serialize_operation(&operation)],
                     });
                     cx.background().spawn(request).detach_and_log_err(cx);
+                } else if let Some(project_id) = self.remote_id() {
+                    let _ = self
+                        .client
+                        .send(proto::RegisterProjectActivity { project_id });
                 }
             }
             BufferEvent::Edited { .. } => {

crates/rpc/proto/zed.proto 🔗

@@ -36,6 +36,7 @@ message Envelope {
         OpenBufferForSymbolResponse open_buffer_for_symbol_response = 29;
 
         UpdateProject update_project = 30;
+        RegisterProjectActivity register_project_activity = 31;
         UpdateWorktree update_worktree = 32;
 
         CreateProjectEntry create_project_entry = 33;
@@ -135,6 +136,10 @@ message UpdateProject {
     repeated WorktreeMetadata worktrees = 2;
 }
 
+message RegisterProjectActivity {
+    uint64 project_id = 1;
+}
+
 message RequestJoinProject {
     uint64 requester_id = 1;
     uint64 project_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -134,6 +134,7 @@ messages!(
     (Ping, Foreground),
     (ProjectUnshared, Foreground),
     (RegisterProject, Foreground),
+    (RegisterProjectActivity, Foreground),
     (ReloadBuffers, Foreground),
     (ReloadBuffersResponse, Foreground),
     (RemoveProjectCollaborator, Foreground),
@@ -236,6 +237,7 @@ entity_messages!(
     PerformRename,
     PrepareRename,
     ProjectUnshared,
+    RegisterProjectActivity,
     ReloadBuffers,
     RemoveProjectCollaborator,
     RenameProjectEntry,