From 6d93a41f4044a4eb07247bdcb57d749a250c0e77 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 14 Jun 2022 16:26:00 +0200 Subject: [PATCH 1/5] Exclude admins from collected metrics --- crates/collab/src/rpc.rs | 2 +- crates/collab/src/rpc/store.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 2204a0319c264791e78064661197c9038e9bb5df..5567472d6571aac1836160324d68cb0d34643f08 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -329,7 +329,7 @@ impl Server { { let mut store = this.store_mut().await; - store.add_connection(connection_id, user_id); + store.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?; if let Some((code, count)) = invite_code { diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d929078dc5f9c1f41660aac4d322caacda653abc..88b4cc80677b08dda58d2556c220621a8a46ffd4 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -25,6 +25,7 @@ pub struct Store { #[derive(Serialize)] struct ConnectionState { user_id: UserId, + admin: bool, projects: HashSet, requested_projects: HashSet, channels: HashSet, @@ -88,13 +89,17 @@ pub struct Metrics { impl Store { pub fn metrics(&self) -> Metrics { - let connections = self.connections.len(); + let connections = self.connections.values().filter(|c| !c.admin).count(); let mut registered_projects = 0; let mut shared_projects = 0; for project in self.projects.values() { - registered_projects += 1; - if !project.guests.is_empty() { - shared_projects += 1; + if let Some(connection) = self.connections.get(&project.host_connection_id) { + if !connection.admin { + registered_projects += 1; + if !project.guests.is_empty() { + shared_projects += 1; + } + } } } @@ -106,11 +111,12 @@ impl Store { } #[instrument(skip(self))] - pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { + pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) { self.connections.insert( connection_id, ConnectionState { user_id, + admin, projects: Default::default(), requested_projects: Default::default(), channels: Default::default(), From 3a1d0dd69234ba76f7f5d7a4226a2cd55d492afa Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 15 Jun 2022 10:33:08 +0200 Subject: [PATCH 2/5] Track active projects in metrics An active project is defined as a project where there has been at least a buffer edit, a join request/response, or a follow update in the last minute. --- crates/collab/src/rpc.rs | 61 +++++++++++++++++++++++----------- crates/collab/src/rpc/store.rs | 47 ++++++++++++++++++++++++-- crates/project/src/project.rs | 4 +++ crates/rpc/proto/zed.proto | 5 +++ crates/rpc/src/proto.rs | 2 ++ 5 files changed, 97 insertions(+), 22 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 5567472d6571aac1836160324d68cb0d34643f08..81ec2858e23e296028aaca61e487768475fae43c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -61,8 +61,10 @@ pub use store::{Store, Worktree}; lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = register_int_gauge!("connections", "number of connections").unwrap(); - static ref METRIC_PROJECTS: IntGauge = - register_int_gauge!("projects", "number of open projects").unwrap(); + static ref METRIC_REGISTERED_PROJECTS: IntGauge = + register_int_gauge!("registered_projects", "number of registered projects").unwrap(); + static ref METRIC_ACTIVE_PROJECTS: IntGauge = + register_int_gauge!("active_projects", "number of active projects").unwrap(); static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!( "shared_projects", "number of open projects with one or more guests" @@ -159,6 +161,7 @@ impl Server { .add_message_handler(Server::leave_project) .add_message_handler(Server::respond_to_join_project_request) .add_message_handler(Server::update_project) + .add_message_handler(Server::register_project_activity) .add_request_handler(Server::update_worktree) .add_message_handler(Server::start_language_server) .add_message_handler(Server::update_language_server) @@ -844,6 +847,16 @@ impl Server { Ok(()) } + async fn register_project_activity( + self: Arc, + request: TypedEnvelope, + ) -> Result<()> { + self.store_mut() + .await + .register_project_activity(request.payload.project_id, request.sender_id)?; + Ok(()) + } + async fn update_worktree( self: Arc, request: TypedEnvelope, @@ -1001,10 +1014,12 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = { + let mut store = self.store_mut().await; + store.register_project_activity(request.payload.project_id, request.sender_id)?; + store.project_connection_ids(request.payload.project_id, request.sender_id)? + }; + broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1065,14 +1080,18 @@ impl Server { ) -> Result<()> { let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; - if !self - .store() - .await - .project_connection_ids(request.payload.project_id, follower_id)? - .contains(&leader_id) { - Err(anyhow!("no such peer"))?; + let mut store = self.store_mut().await; + if store + .project_connection_ids(request.payload.project_id, follower_id)? + .contains(&leader_id) + { + Err(anyhow!("no such peer"))?; + } + + store.register_project_activity(request.payload.project_id, follower_id)?; } + let mut response_payload = self .peer .forward_request(request.sender_id, leader_id, request.payload) @@ -1086,14 +1105,14 @@ impl Server { async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { let leader_id = ConnectionId(request.payload.leader_id); - if !self - .store() - .await + let mut store = self.store_mut().await; + if !store .project_connection_ids(request.payload.project_id, request.sender_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } + store.register_project_activity(request.payload.project_id, request.sender_id)?; self.peer .forward_send(request.sender_id, leader_id, request.payload)?; Ok(()) @@ -1103,10 +1122,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let connection_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let mut store = self.store_mut().await; + store.register_project_activity(request.payload.project_id, request.sender_id)?; + let connection_ids = + store.project_connection_ids(request.payload.project_id, request.sender_id)?; let leader_id = request .payload .variant @@ -1574,12 +1593,14 @@ impl<'a> Drop for StoreWriteGuard<'a> { let metrics = self.metrics(); METRIC_CONNECTIONS.set(metrics.connections as _); - METRIC_PROJECTS.set(metrics.registered_projects as _); + METRIC_REGISTERED_PROJECTS.set(metrics.registered_projects as _); + METRIC_ACTIVE_PROJECTS.set(metrics.active_projects as _); METRIC_SHARED_PROJECTS.set(metrics.shared_projects as _); tracing::info!( connections = metrics.connections, registered_projects = metrics.registered_projects, + active_projects = metrics.active_projects, shared_projects = metrics.shared_projects, "metrics" ); diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 88b4cc80677b08dda58d2556c220621a8a46ffd4..0b548f8a166b1de2eab1d83103f959c700418267 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -9,6 +9,7 @@ use std::{ mem, path::{Path, PathBuf}, str, + time::{Duration, Instant}, }; use tracing::instrument; @@ -41,6 +42,8 @@ pub struct Project { pub active_replica_ids: HashSet, pub worktrees: BTreeMap, pub language_servers: Vec, + #[serde(skip)] + last_activity: Option, } #[derive(Default, Serialize)] @@ -84,6 +87,7 @@ pub struct LeftProject { pub struct Metrics { pub connections: usize, pub registered_projects: usize, + pub active_projects: usize, pub shared_projects: usize, } @@ -91,13 +95,17 @@ impl Store { pub fn metrics(&self) -> Metrics { let connections = self.connections.values().filter(|c| !c.admin).count(); let mut registered_projects = 0; + let mut active_projects = 0; let mut shared_projects = 0; for project in self.projects.values() { if let Some(connection) = self.connections.get(&project.host_connection_id) { if !connection.admin { registered_projects += 1; - if !project.guests.is_empty() { - shared_projects += 1; + if project.is_active() { + active_projects += 1; + if !project.guests.is_empty() { + shared_projects += 1; + } } } } @@ -106,6 +114,7 @@ impl Store { Metrics { connections, registered_projects, + active_projects, shared_projects, } } @@ -318,6 +327,7 @@ impl Store { active_replica_ids: Default::default(), worktrees: Default::default(), language_servers: Default::default(), + last_activity: None, }, ); if let Some(connection) = self.connections.get_mut(&host_connection_id) { @@ -338,6 +348,7 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; if project.host_connection_id == connection_id { + project.last_activity = Some(Instant::now()); let mut old_worktrees = mem::take(&mut project.worktrees); for worktree in worktrees { if let Some(old_worktree) = old_worktrees.remove(&worktree.id) { @@ -460,6 +471,7 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; connection.requested_projects.insert(project_id); + project.last_activity = Some(Instant::now()); project .join_requests .entry(requester_id) @@ -484,6 +496,8 @@ impl Store { let requester_connection = self.connections.get_mut(&receipt.sender_id)?; requester_connection.requested_projects.remove(&project_id); } + project.last_activity = Some(Instant::now()); + Some(receipts) } @@ -515,6 +529,7 @@ impl Store { receipts_with_replica_ids.push((receipt, replica_id)); } + project.last_activity = Some(Instant::now()); Some((receipts_with_replica_ids, project)) } @@ -565,6 +580,8 @@ impl Store { } } + project.last_activity = Some(Instant::now()); + Ok(LeftProject { host_connection_id: project.host_connection_id, host_user_id: project.host_user_id, @@ -653,6 +670,25 @@ impl Store { .ok_or_else(|| anyhow!("no such project")) } + pub fn register_project_activity( + &mut self, + project_id: u64, + connection_id: ConnectionId, + ) -> Result<()> { + let project = self + .projects + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))?; + if project.host_connection_id == connection_id + || project.guests.contains_key(&connection_id) + { + project.last_activity = Some(Instant::now()); + Ok(()) + } else { + Err(anyhow!("no such project"))? + } + } + pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { let project = self .projects @@ -758,6 +794,13 @@ impl Store { } impl Project { + fn is_active(&self) -> bool { + const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); + self.last_activity.map_or(false, |last_activity| { + last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT + }) + } + pub fn guest_connection_ids(&self) -> Vec { self.guests.keys().copied().collect() } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9b38665ca3475e3b235bd70da8b3dff57906d75d..7c291cade7c6817b551d0f2d9fdb9f679c0f79fa 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1780,6 +1780,10 @@ impl Project { operations: vec![language::proto::serialize_operation(&operation)], }); cx.background().spawn(request).detach_and_log_err(cx); + } else if let Some(project_id) = self.remote_id() { + let _ = self + .client + .send(proto::RegisterProjectActivity { project_id }); } } BufferEvent::Edited { .. } => { diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index ff52fb9e8d6f512904fc5d8535c6e9dc127d5e21..ed05ed7b431f98bc0a485ff1deb8610f8e66dfbb 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -36,6 +36,7 @@ message Envelope { OpenBufferForSymbolResponse open_buffer_for_symbol_response = 29; UpdateProject update_project = 30; + RegisterProjectActivity register_project_activity = 31; UpdateWorktree update_worktree = 32; CreateProjectEntry create_project_entry = 33; @@ -135,6 +136,10 @@ message UpdateProject { repeated WorktreeMetadata worktrees = 2; } +message RegisterProjectActivity { + uint64 project_id = 1; +} + message RequestJoinProject { uint64 requester_id = 1; uint64 project_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 6343c05ceac42b2c1733065faa90c90ffcd1294f..ecee3709863b4239dcd43b3c2e9c21eb4868880c 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -134,6 +134,7 @@ messages!( (Ping, Foreground), (ProjectUnshared, Foreground), (RegisterProject, Foreground), + (RegisterProjectActivity, Foreground), (ReloadBuffers, Foreground), (ReloadBuffersResponse, Foreground), (RemoveProjectCollaborator, Foreground), @@ -236,6 +237,7 @@ entity_messages!( PerformRename, PrepareRename, ProjectUnshared, + RegisterProjectActivity, ReloadBuffers, RemoveProjectCollaborator, RenameProjectEntry, From e373e05d27547c7f46d9e13482545447e06c8bd6 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 15 Jun 2022 10:42:37 +0200 Subject: [PATCH 3/5] :art: --- crates/collab/src/rpc/store.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 0b548f8a166b1de2eab1d83103f959c700418267..2573691eaf6284adf759ead23fe203d6d954cdce 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -675,18 +675,8 @@ impl Store { project_id: u64, connection_id: ConnectionId, ) -> Result<()> { - let project = self - .projects - .get_mut(&project_id) - .ok_or_else(|| anyhow!("no such project"))?; - if project.host_connection_id == connection_id - || project.guests.contains_key(&connection_id) - { - project.last_activity = Some(Instant::now()); - Ok(()) - } else { - Err(anyhow!("no such project"))? - } + self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now()); + Ok(()) } pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { From a85f9e74b111c65d6a3ee535ae8e08a20c664eaf Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 15 Jun 2022 10:54:51 +0200 Subject: [PATCH 4/5] Harvest the latest metrics when `/metrics` is requested Now that we track active projects, if nothing happens to the store during the activity timeout we would still serve some old metrics that may not account for the staleness of a project. This commit changes it so that we grab a mutable reference to the store before serving the metrics, which has the side effect of updating all the metrics. --- crates/collab/src/rpc.rs | 9 ++++++--- crates/collab/src/rpc/store.rs | 1 - 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 81ec2858e23e296028aaca61e487768475fae43c..7b9268f343837696448bab130b8fb2ecf99dced2 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1670,10 +1670,10 @@ pub fn routes(server: Arc) -> Router { .layer( ServiceBuilder::new() .layer(Extension(server.app_state.clone())) - .layer(middleware::from_fn(auth::validate_header)) - .layer(Extension(server)), + .layer(middleware::from_fn(auth::validate_header)), ) .route("/metrics", get(handle_metrics)) + .layer(Extension(server)) } pub async fn handle_websocket_request( @@ -1707,7 +1707,10 @@ pub async fn handle_websocket_request( }) } -pub async fn handle_metrics() -> axum::response::Response { +pub async fn handle_metrics(Extension(server): Extension>) -> axum::response::Response { + // We call `store_mut` here for its side effects of updating metrics. + server.store_mut().await; + let encoder = prometheus::TextEncoder::new(); let metric_families = prometheus::gather(); match encoder.encode_to_string(&metric_families) { diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 2573691eaf6284adf759ead23fe203d6d954cdce..9fe9162edd67c405be4b105ff4cd5cf3d16a8fe7 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -348,7 +348,6 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; if project.host_connection_id == connection_id { - project.last_activity = Some(Instant::now()); let mut old_worktrees = mem::take(&mut project.worktrees); for worktree in worktrees { if let Some(old_worktree) = old_worktrees.remove(&worktree.id) { From 197a4342d0cb13ba63b1119898559953c1f56055 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 15 Jun 2022 11:16:26 +0200 Subject: [PATCH 5/5] Fix tests --- crates/collab/src/rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 7b9268f343837696448bab130b8fb2ecf99dced2..e244757970c6973e08f893e549e5f1251159b644 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1082,7 +1082,7 @@ impl Server { let follower_id = request.sender_id; { let mut store = self.store_mut().await; - if store + if !store .project_connection_ids(request.payload.project_id, follower_id)? .contains(&leader_id) {