Add an API that returns the most active users and the projects where they've been active

Max Brunsfeld created

Change summary

crates/collab/migrations/20220620211403_create_project_activity_periods.sql |   9 
crates/collab/src/api.rs                                                    |  22 
crates/collab/src/db.rs                                                     | 219 
crates/collab/src/integration_tests.rs                                      |   2 
crates/collab/src/main.rs                                                   |   3 
crates/collab/src/rpc.rs                                                    |  67 
crates/collab/src/rpc/store.rs                                              |  97 
7 files changed, 376 insertions(+), 43 deletions(-)

Detailed changes

crates/collab/migrations/20220620211403_create_project_activity_periods.sql 🔗

@@ -0,0 +1,9 @@
+CREATE TABLE IF NOT EXISTS "project_activity_periods" (
+    "id" SERIAL PRIMARY KEY,
+    "duration_millis" INTEGER NOT NULL,
+    "ended_at" TIMESTAMP NOT NULL,
+    "user_id" INTEGER REFERENCES users (id) NOT NULL,
+    "project_id" INTEGER
+);
+
+CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at");

crates/collab/src/api.rs 🔗

@@ -16,7 +16,8 @@ use axum::{
 };
 use axum_extra::response::ErasedJson;
 use serde::{Deserialize, Serialize};
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
+use time::OffsetDateTime;
 use tower::ServiceBuilder;
 use tracing::instrument;
 
@@ -32,6 +33,10 @@ pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Bod
         .route("/invite_codes/:code", get(get_user_for_invite_code))
         .route("/panic", post(trace_panic))
         .route("/rpc_server_snapshot", get(get_rpc_server_snapshot))
+        .route(
+            "/project_activity_summary",
+            get(get_project_activity_summary),
+        )
         .layer(
             ServiceBuilder::new()
                 .layer(Extension(state))
@@ -239,6 +244,21 @@ async fn get_rpc_server_snapshot(
     Ok(ErasedJson::pretty(rpc_server.snapshot().await))
 }
 
+#[derive(Deserialize)]
+struct GetProjectActivityParams {
+    duration_secs: u64,
+}
+
+async fn get_project_activity_summary(
+    Query(params): Query<GetProjectActivityParams>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<ErasedJson> {
+    let end = OffsetDateTime::now_utc();
+    let start = end - Duration::from_secs(params.duration_secs);
+    let summary = app.db.summarize_project_activity(start..end, 100).await?;
+    Ok(ErasedJson::pretty(summary))
+}
+
 #[derive(Deserialize)]
 struct CreateAccessTokenQueryParams {
     public_key: String,

crates/collab/src/db.rs 🔗

@@ -1,3 +1,5 @@
+use std::{ops::Range, time::Duration};
+
 use crate::{Error, Result};
 use anyhow::{anyhow, Context};
 use async_trait::async_trait;
@@ -37,6 +39,22 @@ pub trait Db: Send + Sync {
         email_address: Option<&str>,
     ) -> Result<UserId>;
 
+    /// Record which users have been active in which projects during
+    /// a given period of time.
+    async fn record_project_activity(
+        &self,
+        time_period: Range<OffsetDateTime>,
+        active_projects: &[(UserId, u64)],
+    ) -> Result<()>;
+
+    /// Get the users that have been most active during the given time period,
+    /// along with the amount of time they have been active in each project.
+    async fn summarize_project_activity(
+        &self,
+        time_period: Range<OffsetDateTime>,
+        max_user_count: usize,
+    ) -> Result<Vec<UserActivitySummary>>;
+
     async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>>;
     async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool>;
     async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
@@ -150,7 +168,7 @@ impl Db for PostgresDb {
             .fetch_all(&self.pool)
             .await?)
     }
-  
+
     async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result<Vec<UserId>> {
         let mut query = QueryBuilder::new(
             "INSERT INTO users (github_login, email_address, admin, invite_code, invite_count)",
@@ -411,6 +429,92 @@ impl Db for PostgresDb {
         Ok(invitee_id)
     }
 
+    // project activity
+
+    async fn record_project_activity(
+        &self,
+        time_period: Range<OffsetDateTime>,
+        projects: &[(UserId, u64)],
+    ) -> Result<()> {
+        let query = "
+            INSERT INTO project_activity_periods
+            (ended_at, duration_millis, user_id, project_id)
+            VALUES
+            ($1, $2, $3, $4);
+        ";
+
+        let mut tx = self.pool.begin().await?;
+        let duration_millis =
+            ((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32;
+        for (user_id, project_id) in projects {
+            sqlx::query(query)
+                .bind(time_period.end)
+                .bind(duration_millis)
+                .bind(user_id)
+                .bind(*project_id as i32)
+                .execute(&mut tx)
+                .await?;
+        }
+        tx.commit().await?;
+
+        Ok(())
+    }
+
+    async fn summarize_project_activity(
+        &self,
+        time_period: Range<OffsetDateTime>,
+        max_user_count: usize,
+    ) -> Result<Vec<UserActivitySummary>> {
+        let query = "
+            WITH
+                project_durations AS (
+                    SELECT user_id, project_id, SUM(duration_millis) AS project_duration
+                    FROM project_activity_periods
+                    WHERE $1 <= ended_at AND ended_at <= $2
+                    GROUP BY user_id, project_id
+                ),
+                user_durations AS (
+                    SELECT user_id, SUM(project_duration) as total_duration
+                    FROM project_durations
+                    GROUP BY user_id
+                    ORDER BY total_duration DESC
+                    LIMIT $3
+                )
+            SELECT user_durations.user_id, users.github_login, project_id, project_duration
+            FROM user_durations, project_durations, users
+            WHERE
+                user_durations.user_id = project_durations.user_id AND
+                user_durations.user_id = users.id
+            ORDER BY user_id ASC, project_duration DESC
+        ";
+
+        let mut rows = sqlx::query_as::<_, (UserId, String, i32, i64)>(query)
+            .bind(time_period.start)
+            .bind(time_period.end)
+            .bind(max_user_count as i32)
+            .fetch(&self.pool);
+
+        let mut result = Vec::<UserActivitySummary>::new();
+        while let Some(row) = rows.next().await {
+            let (user_id, github_login, project_id, duration_millis) = row?;
+            let project_id = project_id as u64;
+            let duration = Duration::from_millis(duration_millis as u64);
+            if let Some(last_summary) = result.last_mut() {
+                if last_summary.id == user_id {
+                    last_summary.project_activity.push((project_id, duration));
+                    continue;
+                }
+            }
+            result.push(UserActivitySummary {
+                id: user_id,
+                project_activity: vec![(project_id, duration)],
+                github_login,
+            });
+        }
+
+        Ok(result)
+    }
+
     // contacts
 
     async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
@@ -927,6 +1031,13 @@ pub struct User {
     pub connected_once: bool,
 }
 
+#[derive(Clone, Debug, PartialEq, Serialize)]
+pub struct UserActivitySummary {
+    pub id: UserId,
+    pub github_login: String,
+    pub project_activity: Vec<(u64, Duration)>,
+}
+
 id_type!(OrgId);
 #[derive(FromRow)]
 pub struct Org {
@@ -1125,6 +1236,94 @@ pub mod tests {
         assert_ne!(invite_code_4, invite_code_3);
     }
 
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_project_activity() {
+        let test_db = TestDb::postgres().await;
+        let db = test_db.db();
+
+        let user_1 = db.create_user("user_1", None, false).await.unwrap();
+        let user_2 = db.create_user("user_2", None, false).await.unwrap();
+        let user_3 = db.create_user("user_3", None, false).await.unwrap();
+        let project_1 = 101;
+        let project_2 = 102;
+        let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60);
+
+        // User 2 opens a project
+        let t1 = t0 + Duration::from_secs(10);
+        db.record_project_activity(t0..t1, &[(user_2, project_2)])
+            .await
+            .unwrap();
+
+        let t2 = t1 + Duration::from_secs(10);
+        db.record_project_activity(t1..t2, &[(user_2, project_2)])
+            .await
+            .unwrap();
+
+        // User 1 joins the project
+        let t3 = t2 + Duration::from_secs(10);
+        db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)])
+            .await
+            .unwrap();
+
+        // User 1 opens another project
+        let t4 = t3 + Duration::from_secs(10);
+        db.record_project_activity(
+            t3..t4,
+            &[
+                (user_2, project_2),
+                (user_1, project_2),
+                (user_1, project_1),
+            ],
+        )
+        .await
+        .unwrap();
+
+        // User 3 joins that project
+        let t5 = t4 + Duration::from_secs(10);
+        db.record_project_activity(
+            t4..t5,
+            &[
+                (user_2, project_2),
+                (user_1, project_2),
+                (user_1, project_1),
+                (user_3, project_1),
+            ],
+        )
+        .await
+        .unwrap();
+
+        // User 2 leaves
+        let t6 = t5 + Duration::from_secs(5);
+        db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)])
+            .await
+            .unwrap();
+
+        let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap();
+        assert_eq!(
+            summary,
+            &[
+                UserActivitySummary {
+                    id: user_1,
+                    github_login: "user_1".to_string(),
+                    project_activity: vec![
+                        (project_2, Duration::from_secs(30)),
+                        (project_1, Duration::from_secs(25))
+                    ]
+                },
+                UserActivitySummary {
+                    id: user_2,
+                    github_login: "user_2".to_string(),
+                    project_activity: vec![(project_2, Duration::from_secs(50))]
+                },
+                UserActivitySummary {
+                    id: user_3,
+                    github_login: "user_3".to_string(),
+                    project_activity: vec![(project_1, Duration::from_secs(15))]
+                },
+            ]
+        );
+    }
+
     #[tokio::test(flavor = "multi_thread")]
     async fn test_recent_channel_messages() {
         for test_db in [
@@ -1841,6 +2040,24 @@ pub mod tests {
             unimplemented!()
         }
 
+        // project activity
+
+        async fn record_project_activity(
+            &self,
+            _period: Range<OffsetDateTime>,
+            _active_projects: &[(UserId, u64)],
+        ) -> Result<()> {
+            unimplemented!()
+        }
+
+        async fn summarize_project_activity(
+            &self,
+            _period: Range<OffsetDateTime>,
+            _limit: usize,
+        ) -> Result<Vec<UserActivitySummary>> {
+            unimplemented!()
+        }
+
         // contacts
 
         async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>> {

crates/collab/src/integration_tests.rs 🔗

@@ -4722,7 +4722,7 @@ impl TestServer {
         foreground: Rc<executor::Foreground>,
         background: Arc<executor::Background>,
     ) -> Self {
-        let test_db = TestDb::fake(background);
+        let test_db = TestDb::fake(background.clone());
         let app_state = Self::build_app_state(&test_db).await;
         let peer = Peer::new();
         let notifications = mpsc::unbounded();

crates/collab/src/main.rs 🔗

@@ -14,6 +14,7 @@ use serde::Deserialize;
 use std::{
     net::{SocketAddr, TcpListener},
     sync::Arc,
+    time::Duration,
 };
 use tracing_log::LogTracer;
 use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer};
@@ -66,6 +67,8 @@ async fn main() -> Result<()> {
         .expect("failed to bind TCP listener");
     let rpc_server = rpc::Server::new(state.clone(), None);
 
+    rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor);
+
     let app = Router::<Body>::new()
         .merge(api::routes(&rpc_server, state.clone()))
         .merge(rpc::routes(rpc_server));

crates/collab/src/rpc.rs 🔗

@@ -288,6 +288,57 @@ impl Server {
         })
     }
 
+    /// Start a long lived task that records which users are active in which projects.
+    pub fn start_recording_project_activity<E: 'static + Executor>(
+        self: &Arc<Self>,
+        interval: Duration,
+        executor: E,
+    ) {
+        executor.spawn_detached({
+            let this = Arc::downgrade(self);
+            let executor = executor.clone();
+            async move {
+                let mut period_start = OffsetDateTime::now_utc();
+                let mut active_projects = Vec::<(UserId, u64)>::new();
+                loop {
+                    let sleep = executor.sleep(interval);
+                    sleep.await;
+                    let this = if let Some(this) = this.upgrade() {
+                        this
+                    } else {
+                        break;
+                    };
+
+                    active_projects.clear();
+                    active_projects.extend(this.store().await.projects().flat_map(
+                        |(project_id, project)| {
+                            project.guests.values().chain([&project.host]).filter_map(
+                                |collaborator| {
+                                    if collaborator
+                                        .last_activity
+                                        .map_or(false, |activity| activity > period_start)
+                                    {
+                                        Some((collaborator.user_id, *project_id))
+                                    } else {
+                                        None
+                                    }
+                                },
+                            )
+                        },
+                    ));
+
+                    let period_end = OffsetDateTime::now_utc();
+                    this.app_state
+                        .db
+                        .record_project_activity(period_start..period_end, &active_projects)
+                        .await
+                        .trace_err();
+                    period_start = period_end;
+                }
+            }
+        });
+    }
+
     pub fn handle_connection<E: Executor>(
         self: &Arc<Self>,
         connection: Connection,
@@ -621,7 +672,7 @@ impl Server {
         {
             let state = self.store().await;
             let project = state.project(project_id)?;
-            host_user_id = project.host_user_id;
+            host_user_id = project.host.user_id;
             host_connection_id = project.host_connection_id;
             guest_user_id = state.user_id_for_connection(request.sender_id)?;
         };
@@ -665,7 +716,7 @@ impl Server {
                 Err(anyhow!("no such connection"))?;
             }
 
-            host_user_id = project.host_user_id;
+            host_user_id = project.host.user_id;
             let guest_user_id = UserId::from_proto(request.payload.requester_id);
 
             if !request.payload.allow {
@@ -697,7 +748,7 @@ impl Server {
             collaborators.push(proto::Collaborator {
                 peer_id: project.host_connection_id.0,
                 replica_id: 0,
-                user_id: project.host_user_id.to_proto(),
+                user_id: project.host.user_id.to_proto(),
             });
             let worktrees = project
                 .worktrees
@@ -720,15 +771,15 @@ impl Server {
                 .collect::<Vec<_>>();
 
             // Add all guests other than the requesting user's own connections as collaborators
-            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
+            for (guest_conn_id, guest) in &project.guests {
                 if receipts_with_replica_ids
                     .iter()
-                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
+                    .all(|(receipt, _)| receipt.sender_id != *guest_conn_id)
                 {
                     collaborators.push(proto::Collaborator {
-                        peer_id: peer_conn_id.0,
-                        replica_id: *peer_replica_id as u32,
-                        user_id: peer_user_id.to_proto(),
+                        peer_id: guest_conn_id.0,
+                        replica_id: guest.replica_id as u32,
+                        user_id: guest.user_id.to_proto(),
                     });
                 }
             }

crates/collab/src/rpc/store.rs 🔗

@@ -9,8 +9,9 @@ use std::{
     mem,
     path::{Path, PathBuf},
     str,
-    time::{Duration, Instant},
+    time::Duration,
 };
+use time::OffsetDateTime;
 use tracing::instrument;
 
 #[derive(Default, Serialize)]
@@ -35,15 +36,21 @@ struct ConnectionState {
 #[derive(Serialize)]
 pub struct Project {
     pub host_connection_id: ConnectionId,
-    pub host_user_id: UserId,
-    pub guests: HashMap<ConnectionId, (ReplicaId, UserId)>,
+    pub host: Collaborator,
+    pub guests: HashMap<ConnectionId, Collaborator>,
     #[serde(skip)]
     pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
     pub active_replica_ids: HashSet<ReplicaId>,
     pub worktrees: BTreeMap<u64, Worktree>,
     pub language_servers: Vec<proto::LanguageServer>,
+}
+
+#[derive(Serialize)]
+pub struct Collaborator {
+    pub replica_id: ReplicaId,
+    pub user_id: UserId,
     #[serde(skip)]
-    last_activity: Option<Instant>,
+    pub last_activity: Option<OffsetDateTime>,
 }
 
 #[derive(Default, Serialize)]
@@ -93,6 +100,9 @@ pub struct Metrics {
 
 impl Store {
     pub fn metrics(&self) -> Metrics {
+        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
+        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
+
         let connections = self.connections.values().filter(|c| !c.admin).count();
         let mut registered_projects = 0;
         let mut active_projects = 0;
@@ -101,7 +111,7 @@ impl Store {
             if let Some(connection) = self.connections.get(&project.host_connection_id) {
                 if !connection.admin {
                     registered_projects += 1;
-                    if project.is_active() {
+                    if project.is_active_since(active_window_start) {
                         active_projects += 1;
                         if !project.guests.is_empty() {
                             shared_projects += 1;
@@ -289,7 +299,7 @@ impl Store {
         let mut metadata = Vec::new();
         for project_id in project_ids {
             if let Some(project) = self.projects.get(&project_id) {
-                if project.host_user_id == user_id {
+                if project.host.user_id == user_id {
                     metadata.push(proto::ProjectMetadata {
                         id: project_id,
                         visible_worktree_root_names: project
@@ -301,7 +311,7 @@ impl Store {
                         guests: project
                             .guests
                             .values()
-                            .map(|(_, user_id)| user_id.to_proto())
+                            .map(|guest| guest.user_id.to_proto())
                             .collect(),
                     });
                 }
@@ -321,13 +331,16 @@ impl Store {
             project_id,
             Project {
                 host_connection_id,
-                host_user_id,
+                host: Collaborator {
+                    user_id: host_user_id,
+                    replica_id: 0,
+                    last_activity: None,
+                },
                 guests: Default::default(),
                 join_requests: Default::default(),
                 active_replica_ids: Default::default(),
                 worktrees: Default::default(),
                 language_servers: Default::default(),
-                last_activity: None,
             },
         );
         if let Some(connection) = self.connections.get_mut(&host_connection_id) {
@@ -470,7 +483,6 @@ impl Store {
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
         connection.requested_projects.insert(project_id);
-        project.last_activity = Some(Instant::now());
         project
             .join_requests
             .entry(requester_id)
@@ -495,7 +507,7 @@ impl Store {
             let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
             requester_connection.requested_projects.remove(&project_id);
         }
-        project.last_activity = Some(Instant::now());
+        project.host.last_activity = Some(OffsetDateTime::now_utc());
 
         Some(receipts)
     }
@@ -522,13 +534,18 @@ impl Store {
                 replica_id += 1;
             }
             project.active_replica_ids.insert(replica_id);
-            project
-                .guests
-                .insert(receipt.sender_id, (replica_id, requester_id));
+            project.guests.insert(
+                receipt.sender_id,
+                Collaborator {
+                    replica_id,
+                    user_id: requester_id,
+                    last_activity: Some(OffsetDateTime::now_utc()),
+                },
+            );
             receipts_with_replica_ids.push((receipt, replica_id));
         }
 
-        project.last_activity = Some(Instant::now());
+        project.host.last_activity = Some(OffsetDateTime::now_utc());
         Some((receipts_with_replica_ids, project))
     }
 
@@ -544,13 +561,12 @@ impl Store {
             .ok_or_else(|| anyhow!("no such project"))?;
 
         // If the connection leaving the project is a collaborator, remove it.
-        let remove_collaborator =
-            if let Some((replica_id, _)) = project.guests.remove(&connection_id) {
-                project.active_replica_ids.remove(&replica_id);
-                true
-            } else {
-                false
-            };
+        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
+            project.active_replica_ids.remove(&guest.replica_id);
+            true
+        } else {
+            false
+        };
 
         // If the connection leaving the project has a pending request, remove it.
         // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
@@ -579,11 +595,9 @@ impl Store {
             }
         }
 
-        project.last_activity = Some(Instant::now());
-
         Ok(LeftProject {
             host_connection_id: project.host_connection_id,
-            host_user_id: project.host_user_id,
+            host_user_id: project.host.user_id,
             connection_ids,
             cancel_request,
             unshare,
@@ -674,10 +688,25 @@ impl Store {
         project_id: u64,
         connection_id: ConnectionId,
     ) -> Result<()> {
-        self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now());
+        let project = self
+            .projects
+            .get_mut(&project_id)
+            .ok_or_else(|| anyhow!("no such project"))?;
+        let collaborator = if connection_id == project.host_connection_id {
+            &mut project.host
+        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
+            guest
+        } else {
+            return Err(anyhow!("no such project"))?;
+        };
+        collaborator.last_activity = Some(OffsetDateTime::now_utc());
         Ok(())
     }
 
+    pub fn projects(&self) -> impl Iterator<Item = (&u64, &Project)> {
+        self.projects.iter()
+    }
+
     pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> {
         let project = self
             .projects
@@ -768,7 +797,7 @@ impl Store {
                 project
                     .guests
                     .values()
-                    .map(|(replica_id, _)| *replica_id)
+                    .map(|guest| guest.replica_id)
                     .collect::<HashSet<_>>(),
             );
         }
@@ -783,11 +812,15 @@ impl Store {
 }
 
 impl Project {
-    fn is_active(&self) -> bool {
-        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
-        self.last_activity.map_or(false, |last_activity| {
-            last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT
-        })
+    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
+        self.guests
+            .values()
+            .chain([&self.host])
+            .any(|collaborator| {
+                collaborator
+                    .last_activity
+                    .map_or(false, |active_time| active_time > start_time)
+            })
     }
 
     pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {