From 5cc5e15f4d8515122d13ddb857458d4e20d2fd53 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 28 Jun 2022 19:29:05 +0200 Subject: [PATCH 01/13] Implement `Db::summarize_user_activity` Co-Authored-By: Max Brunsfeld --- crates/collab/src/db.rs | 166 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index ce494db59bb6222b90533d0129f86c21e651e697..9429a3ad1d1067ec33fb20a51e5231ab301af8bb 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -10,7 +10,7 @@ use nanoid::nanoid; use serde::Serialize; pub use sqlx::postgres::PgPoolOptions as DbOptions; use sqlx::{types::Uuid, FromRow, QueryBuilder, Row}; -use time::OffsetDateTime; +use time::{OffsetDateTime, PrimitiveDateTime}; #[async_trait] pub trait Db: Send + Sync { @@ -77,6 +77,13 @@ pub trait Db: Send + Sync { max_user_count: usize, ) -> Result>; + /// Get the project activity for the given user and time period. + async fn summarize_user_activity( + &self, + user_id: UserId, + time_period: Range, + ) -> Result>; + async fn get_contacts(&self, id: UserId) -> Result>; async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result; async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>; @@ -596,7 +603,7 @@ impl Db for PostgresDb { project_durations AS ( SELECT user_id, project_id, SUM(duration_millis) AS project_duration FROM project_activity_periods - WHERE $1 <= ended_at AND ended_at <= $2 + WHERE $1 < ended_at AND ended_at <= $2 GROUP BY user_id, project_id ), user_durations AS ( @@ -641,6 +648,89 @@ impl Db for PostgresDb { Ok(result) } + async fn summarize_user_activity( + &self, + user_id: UserId, + time_period: Range, + ) -> Result> { + const COALESCE_THRESHOLD: Duration = Duration::from_secs(5); + + let query = " + SELECT + project_activity_periods.ended_at, + project_activity_periods.duration_millis, + project_activity_periods.project_id, + worktree_extensions.extension, + worktree_extensions.count + FROM project_activity_periods + LEFT OUTER JOIN + worktree_extensions + ON + project_activity_periods.project_id = worktree_extensions.project_id + WHERE + project_activity_periods.user_id = $1 AND + $2 < project_activity_periods.ended_at AND + project_activity_periods.ended_at <= $3 + ORDER BY project_activity_periods.id ASC + "; + + let mut rows = sqlx::query_as::< + _, + ( + PrimitiveDateTime, + i32, + ProjectId, + Option, + Option, + ), + >(query) + .bind(user_id) + .bind(time_period.start) + .bind(time_period.end) + .fetch(&self.pool); + + let mut durations: HashMap> = Default::default(); + while let Some(row) = rows.next().await { + let (ended_at, duration_millis, project_id, extension, extension_count) = row?; + let ended_at = ended_at.assume_utc(); + let duration = Duration::from_millis(duration_millis as u64); + let started_at = ended_at - duration; + let project_durations = durations.entry(project_id).or_default(); + + if let Some(prev_duration) = project_durations.last_mut() { + if started_at - prev_duration.end <= COALESCE_THRESHOLD { + prev_duration.end = ended_at; + } else { + project_durations.push(UserActivityDuration { + project_id, + start: started_at, + end: ended_at, + extensions: Default::default(), + }); + } + } else { + project_durations.push(UserActivityDuration { + project_id, + start: started_at, + end: ended_at, + extensions: Default::default(), + }); + } + + if let Some((extension, extension_count)) = extension.zip(extension_count) { + project_durations + .last_mut() + .unwrap() + .extensions + .insert(extension, extension_count as usize); + } + } + + let mut durations = durations.into_values().flatten().collect::>(); + durations.sort_unstable_by_key(|duration| duration.start); + Ok(durations) + } + // contacts async fn get_contacts(&self, user_id: UserId) -> Result> { @@ -1172,6 +1262,14 @@ pub struct UserActivitySummary { pub project_activity: Vec<(ProjectId, Duration)>, } +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct UserActivityDuration { + project_id: ProjectId, + start: OffsetDateTime, + end: OffsetDateTime, + extensions: HashMap, +} + id_type!(OrgId); #[derive(FromRow)] pub struct Org { @@ -1439,6 +1537,13 @@ pub mod tests { let user_2 = db.create_user("user_2", None, false).await.unwrap(); let user_3 = db.create_user("user_3", None, false).await.unwrap(); let project_1 = db.register_project(user_1).await.unwrap(); + db.update_worktree_extensions( + project_1, + 1, + HashMap::from_iter([("rs".into(), 5), ("md".into(), 7)]), + ) + .await + .unwrap(); let project_2 = db.register_project(user_2).await.unwrap(); let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60); @@ -1492,9 +1597,14 @@ pub mod tests { .await .unwrap(); - let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap(); + let t7 = t6 + Duration::from_secs(60); + let t8 = t7 + Duration::from_secs(10); + db.record_project_activity(t7..t8, &[(user_1, project_1)]) + .await + .unwrap(); + assert_eq!( - summary, + db.summarize_project_activity(t0..t6, 10).await.unwrap(), &[ UserActivitySummary { id: user_1, @@ -1516,6 +1626,46 @@ pub mod tests { }, ] ); + assert_eq!( + db.summarize_user_activity(user_1, t3..t6).await.unwrap(), + &[ + UserActivityDuration { + project_id: project_1, + start: t3, + end: t6, + extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]), + }, + UserActivityDuration { + project_id: project_2, + start: t3, + end: t5, + extensions: Default::default(), + }, + ] + ); + assert_eq!( + db.summarize_user_activity(user_1, t0..t8).await.unwrap(), + &[ + UserActivityDuration { + project_id: project_2, + start: t2, + end: t5, + extensions: Default::default(), + }, + UserActivityDuration { + project_id: project_1, + start: t3, + end: t6, + extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]), + }, + UserActivityDuration { + project_id: project_1, + start: t7, + end: t8, + extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]), + }, + ] + ); } #[tokio::test(flavor = "multi_thread")] @@ -2316,6 +2466,14 @@ pub mod tests { unimplemented!() } + async fn summarize_user_activity( + &self, + _user_id: UserId, + _time_period: Range, + ) -> Result> { + unimplemented!() + } + // contacts async fn get_contacts(&self, id: UserId) -> Result> { From 1d10e4528221cffe1cac891b0cf2118c0a70a509 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 28 Jun 2022 15:37:56 -0700 Subject: [PATCH 02/13] :art: Tweak some names dealing with user activity * Rename `project_activity_summary` to `top_users_activity_summary` to make clearer the distinction between it and the per-user summary. * Rename `user_activity_summary` to `user_activity_timeline`, since its output is structured a bit differently than the courser-grained "summary" returned by the top-user query. * Rename `ActivityDuration` -> `ActivityPeriod` --- crates/collab/src/api.rs | 12 ++--- crates/collab/src/db.rs | 95 ++++++++++++++++++++++------------------ crates/collab/src/rpc.rs | 2 +- 3 files changed, 60 insertions(+), 49 deletions(-) diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index 0c91a5a0794020ba548014a242abce4352572175..4fa4781160ed49dbb9044ac5b6fbf1a6a3185847 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -36,8 +36,8 @@ pub fn routes(rpc_server: &Arc, state: Arc) -> Router, +async fn get_top_users_activity_summary( + Query(params): Query, Extension(app): Extension>, ) -> Result { let summary = app .db - .summarize_project_activity(params.start..params.end, 100) + .get_top_users_activity_summary(params.start..params.end, 100) .await?; Ok(ErasedJson::pretty(summary)) } diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 9429a3ad1d1067ec33fb20a51e5231ab301af8bb..5268f7f54f581723e09895890f1d3c02fba63e72 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -7,7 +7,7 @@ use axum::http::StatusCode; use collections::HashMap; use futures::StreamExt; use nanoid::nanoid; -use serde::Serialize; +use serde::{Deserialize, Serialize}; pub use sqlx::postgres::PgPoolOptions as DbOptions; use sqlx::{types::Uuid, FromRow, QueryBuilder, Row}; use time::{OffsetDateTime, PrimitiveDateTime}; @@ -63,7 +63,7 @@ pub trait Db: Send + Sync { /// Record which users have been active in which projects during /// a given period of time. - async fn record_project_activity( + async fn record_user_activity( &self, time_period: Range, active_projects: &[(UserId, ProjectId)], @@ -71,18 +71,18 @@ pub trait Db: Send + Sync { /// Get the users that have been most active during the given time period, /// along with the amount of time they have been active in each project. - async fn summarize_project_activity( + async fn get_top_users_activity_summary( &self, time_period: Range, max_user_count: usize, ) -> Result>; /// Get the project activity for the given user and time period. - async fn summarize_user_activity( + async fn get_user_activity_timeline( &self, - user_id: UserId, time_period: Range, - ) -> Result>; + user_id: UserId, + ) -> Result>; async fn get_contacts(&self, id: UserId) -> Result>; async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result; @@ -564,7 +564,7 @@ impl Db for PostgresDb { Ok(extension_counts) } - async fn record_project_activity( + async fn record_user_activity( &self, time_period: Range, projects: &[(UserId, ProjectId)], @@ -593,7 +593,7 @@ impl Db for PostgresDb { Ok(()) } - async fn summarize_project_activity( + async fn get_top_users_activity_summary( &self, time_period: Range, max_user_count: usize, @@ -648,11 +648,11 @@ impl Db for PostgresDb { Ok(result) } - async fn summarize_user_activity( + async fn get_user_activity_timeline( &self, - user_id: UserId, time_period: Range, - ) -> Result> { + user_id: UserId, + ) -> Result> { const COALESCE_THRESHOLD: Duration = Duration::from_secs(5); let query = " @@ -689,19 +689,19 @@ impl Db for PostgresDb { .bind(time_period.end) .fetch(&self.pool); - let mut durations: HashMap> = Default::default(); + let mut time_periods: HashMap> = Default::default(); while let Some(row) = rows.next().await { let (ended_at, duration_millis, project_id, extension, extension_count) = row?; let ended_at = ended_at.assume_utc(); let duration = Duration::from_millis(duration_millis as u64); let started_at = ended_at - duration; - let project_durations = durations.entry(project_id).or_default(); + let project_time_periods = time_periods.entry(project_id).or_default(); - if let Some(prev_duration) = project_durations.last_mut() { + if let Some(prev_duration) = project_time_periods.last_mut() { if started_at - prev_duration.end <= COALESCE_THRESHOLD { prev_duration.end = ended_at; } else { - project_durations.push(UserActivityDuration { + project_time_periods.push(UserActivityPeriod { project_id, start: started_at, end: ended_at, @@ -709,7 +709,7 @@ impl Db for PostgresDb { }); } } else { - project_durations.push(UserActivityDuration { + project_time_periods.push(UserActivityPeriod { project_id, start: started_at, end: ended_at, @@ -718,7 +718,7 @@ impl Db for PostgresDb { } if let Some((extension, extension_count)) = extension.zip(extension_count) { - project_durations + project_time_periods .last_mut() .unwrap() .extensions @@ -726,7 +726,7 @@ impl Db for PostgresDb { } } - let mut durations = durations.into_values().flatten().collect::>(); + let mut durations = time_periods.into_values().flatten().collect::>(); durations.sort_unstable_by_key(|duration| duration.start); Ok(durations) } @@ -1206,7 +1206,18 @@ impl Db for PostgresDb { macro_rules! id_type { ($name:ident) => { #[derive( - Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, Serialize, + Clone, + Copy, + Debug, + Default, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + sqlx::Type, + Serialize, + Deserialize, )] #[sqlx(transparent)] #[serde(transparent)] @@ -1263,7 +1274,7 @@ pub struct UserActivitySummary { } #[derive(Clone, Debug, PartialEq, Serialize)] -pub struct UserActivityDuration { +pub struct UserActivityPeriod { project_id: ProjectId, start: OffsetDateTime, end: OffsetDateTime, @@ -1549,24 +1560,24 @@ pub mod tests { // User 2 opens a project let t1 = t0 + Duration::from_secs(10); - db.record_project_activity(t0..t1, &[(user_2, project_2)]) + db.record_user_activity(t0..t1, &[(user_2, project_2)]) .await .unwrap(); let t2 = t1 + Duration::from_secs(10); - db.record_project_activity(t1..t2, &[(user_2, project_2)]) + db.record_user_activity(t1..t2, &[(user_2, project_2)]) .await .unwrap(); // User 1 joins the project let t3 = t2 + Duration::from_secs(10); - db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)]) + db.record_user_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)]) .await .unwrap(); // User 1 opens another project let t4 = t3 + Duration::from_secs(10); - db.record_project_activity( + db.record_user_activity( t3..t4, &[ (user_2, project_2), @@ -1579,7 +1590,7 @@ pub mod tests { // User 3 joins that project let t5 = t4 + Duration::from_secs(10); - db.record_project_activity( + db.record_user_activity( t4..t5, &[ (user_2, project_2), @@ -1593,18 +1604,18 @@ pub mod tests { // User 2 leaves let t6 = t5 + Duration::from_secs(5); - db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)]) + db.record_user_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)]) .await .unwrap(); let t7 = t6 + Duration::from_secs(60); let t8 = t7 + Duration::from_secs(10); - db.record_project_activity(t7..t8, &[(user_1, project_1)]) + db.record_user_activity(t7..t8, &[(user_1, project_1)]) .await .unwrap(); assert_eq!( - db.summarize_project_activity(t0..t6, 10).await.unwrap(), + db.get_top_users_activity_summary(t0..t6, 10).await.unwrap(), &[ UserActivitySummary { id: user_1, @@ -1627,15 +1638,15 @@ pub mod tests { ] ); assert_eq!( - db.summarize_user_activity(user_1, t3..t6).await.unwrap(), + db.get_user_activity_timeline(t3..t6, user_1).await.unwrap(), &[ - UserActivityDuration { + UserActivityPeriod { project_id: project_1, start: t3, end: t6, extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]), }, - UserActivityDuration { + UserActivityPeriod { project_id: project_2, start: t3, end: t5, @@ -1644,21 +1655,21 @@ pub mod tests { ] ); assert_eq!( - db.summarize_user_activity(user_1, t0..t8).await.unwrap(), + db.get_user_activity_timeline(t0..t8, user_1).await.unwrap(), &[ - UserActivityDuration { + UserActivityPeriod { project_id: project_2, start: t2, end: t5, extensions: Default::default(), }, - UserActivityDuration { + UserActivityPeriod { project_id: project_1, start: t3, end: t6, extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]), }, - UserActivityDuration { + UserActivityPeriod { project_id: project_1, start: t7, end: t8, @@ -2450,27 +2461,27 @@ pub mod tests { unimplemented!() } - async fn record_project_activity( + async fn record_user_activity( &self, - _period: Range, + _time_period: Range, _active_projects: &[(UserId, ProjectId)], ) -> Result<()> { unimplemented!() } - async fn summarize_project_activity( + async fn get_top_users_activity_summary( &self, - _period: Range, + _time_period: Range, _limit: usize, ) -> Result> { unimplemented!() } - async fn summarize_user_activity( + async fn get_user_activity_timeline( &self, - _user_id: UserId, _time_period: Range, - ) -> Result> { + _user_id: UserId, + ) -> Result> { unimplemented!() } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 79c1e53a0b5f906cb961dc73a2348d1866d7fcd2..00252f4d6b0cecac76036c6c5f0e56d1f0e73d01 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -332,7 +332,7 @@ impl Server { let period_end = OffsetDateTime::now_utc(); this.app_state .db - .record_project_activity(period_start..period_end, &active_projects) + .record_user_activity(period_start..period_end, &active_projects) .await .trace_err(); period_start = period_end; From 14d737514978a0c18e22bfd776de713637a74e8a Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 28 Jun 2022 15:40:48 -0700 Subject: [PATCH 03/13] Add rest API for user activity timeline --- crates/collab/src/api.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index 4fa4781160ed49dbb9044ac5b6fbf1a6a3185847..d20748609af38dca9410eb83e834c423d16cffb5 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -39,6 +39,10 @@ pub fn routes(rpc_server: &Arc, state: Arc) -> Router, + Query(params): Query, + Extension(app): Extension>, +) -> Result { + let summary = app + .db + .get_user_activity_timeline(params.start..params.end, UserId(user_id)) + .await?; + Ok(ErasedJson::pretty(summary)) +} + #[derive(Deserialize)] struct GetProjectMetadataParams { project_id: u64, From a52de770b12bd33fdf0be07d958539423644b211 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 14:26:02 +0200 Subject: [PATCH 04/13] Increase coalesce threshold in `Db::get_user_activity_timeline` --- crates/collab/src/db.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 5268f7f54f581723e09895890f1d3c02fba63e72..ef91bb03acbd66a179efab706682a7af6e6976ea 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1,4 +1,4 @@ -use std::{ops::Range, time::Duration}; +use std::{cmp, ops::Range, time::Duration}; use crate::{Error, Result}; use anyhow::{anyhow, Context}; @@ -653,7 +653,7 @@ impl Db for PostgresDb { time_period: Range, user_id: UserId, ) -> Result> { - const COALESCE_THRESHOLD: Duration = Duration::from_secs(5); + const COALESCE_THRESHOLD: Duration = Duration::from_secs(30); let query = " SELECT @@ -698,8 +698,10 @@ impl Db for PostgresDb { let project_time_periods = time_periods.entry(project_id).or_default(); if let Some(prev_duration) = project_time_periods.last_mut() { - if started_at - prev_duration.end <= COALESCE_THRESHOLD { - prev_duration.end = ended_at; + if started_at <= prev_duration.end + COALESCE_THRESHOLD + && ended_at >= prev_duration.start + { + prev_duration.end = cmp::max(prev_duration.end, ended_at); } else { project_time_periods.push(UserActivityPeriod { project_id, @@ -1276,7 +1278,9 @@ pub struct UserActivitySummary { #[derive(Clone, Debug, PartialEq, Serialize)] pub struct UserActivityPeriod { project_id: ProjectId, + #[serde(with = "time::serde::iso8601")] start: OffsetDateTime, + #[serde(with = "time::serde::iso8601")] end: OffsetDateTime, extensions: HashMap, } From e3cfc7b3ce9125e6ce601962b10fc2e131635a83 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 14:55:04 +0200 Subject: [PATCH 05/13] Register project activity for offline projects as well --- crates/collab/src/integration_tests.rs | 61 +++++++++++++-- crates/collab/src/rpc.rs | 59 ++++++++++++-- crates/collab/src/rpc/store.rs | 60 +++++++++++--- crates/project/src/project.rs | 104 ++++++++++++++++--------- crates/rpc/proto/zed.proto | 5 +- crates/rpc/src/proto.rs | 2 +- crates/rpc/src/rpc.rs | 2 +- 7 files changed, 227 insertions(+), 66 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index facef17b63b52192f668fa1aaee0aeb777e7a665..89b1f3878227500282b3a7680a5d2689db8709bd 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -484,14 +484,20 @@ async fn test_offline_projects( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, ) { cx_a.foreground().forbid_parking(); let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; let user_a = UserId::from_proto(client_a.user_id().unwrap()); server - .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)]) + .make_contacts(vec![ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + ]) .await; // Set up observers of the project and user stores. Any time either of @@ -585,7 +591,8 @@ async fn test_offline_projects( .await .unwrap(); - // When a project is offline, no information about it is sent to the server. + // When a project is offline, we still create it on the server but is invisible + // to other users. deterministic.run_until_parked(); assert!(server .store @@ -593,7 +600,10 @@ async fn test_offline_projects( .await .project_metadata_for_user(user_a) .is_empty()); - assert!(project.read_with(cx_a, |project, _| project.remote_id().is_none())); + project.read_with(cx_a, |project, _| { + assert!(project.remote_id().is_some()); + assert!(!project.is_online()); + }); assert!(client_b .user_store .read_with(cx_b, |store, _| { store.contacts()[0].projects.is_empty() })); @@ -667,7 +677,7 @@ async fn test_offline_projects( // Build another project using a directory which was previously part of // an online project. Restore the project's state from the host's database. - let project2 = cx_a.update(|cx| { + let project2_a = cx_a.update(|cx| { Project::local( false, client_a.client.clone(), @@ -678,21 +688,21 @@ async fn test_offline_projects( cx, ) }); - project2 + project2_a .update(cx_a, |p, cx| { p.find_or_create_local_worktree("/code/crate3", true, cx) }) .await .unwrap(); - project2 + project2_a .update(cx_a, |project, cx| project.restore_state(cx)) .await .unwrap(); // This project is now online, because its directory was previously online. - project2.read_with(cx_a, |project, _| assert!(project.is_online())); + project2_a.read_with(cx_a, |project, _| assert!(project.is_online())); deterministic.run_until_parked(); - let project2_id = project2.read_with(cx_a, |p, _| p.remote_id()).unwrap(); + let project2_id = project2_a.read_with(cx_a, |p, _| p.remote_id()).unwrap(); client_b.user_store.read_with(cx_b, |store, _| { assert_eq!( store.contacts()[0].projects, @@ -715,6 +725,41 @@ async fn test_offline_projects( ); }); + let project2_b = client_b.build_remote_project(&project2_a, cx_a, cx_b).await; + let project2_c = cx_c.foreground().spawn(Project::remote( + project2_id, + client_c.client.clone(), + client_c.user_store.clone(), + client_c.project_store.clone(), + client_c.language_registry.clone(), + FakeFs::new(cx_c.background()), + cx_c.to_async(), + )); + deterministic.run_until_parked(); + + // Taking a project offline unshares the project, rejects any pending join request and + // disconnects existing guests. + project2_a.update(cx_a, |project, cx| project.set_online(false, cx)); + deterministic.run_until_parked(); + project2_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); + project2_b.read_with(cx_b, |project, _| assert!(project.is_read_only())); + project2_c.await.unwrap_err(); + + client_b.user_store.read_with(cx_b, |store, _| { + assert_eq!( + store.contacts()[0].projects, + &[ProjectMetadata { + id: project_id, + visible_worktree_root_names: vec![ + "crate1".into(), + "crate2".into(), + "crate3".into() + ], + guests: Default::default(), + },] + ); + }); + cx_a.update(|cx| { drop(subscriptions); drop(view); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 00252f4d6b0cecac76036c6c5f0e56d1f0e73d01..1aa2e5c4af4105edf797e4067ae3bd9a9221830a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -605,9 +605,11 @@ impl Server { .await .user_id_for_connection(request.sender_id)?; let project_id = self.app_state.db.register_project(user_id).await?; - self.store_mut() - .await - .register_project(request.sender_id, project_id)?; + self.store_mut().await.register_project( + request.sender_id, + project_id, + request.payload.online, + )?; response.send(proto::RegisterProjectResponse { project_id: project_id.to_proto(), @@ -925,12 +927,53 @@ impl Server { let guest_connection_ids = state .read_project(project_id, request.sender_id)? .guest_connection_ids(); - state.update_project(project_id, &request.payload.worktrees, request.sender_id)?; - broadcast(request.sender_id, guest_connection_ids, |connection_id| { - self.peer - .forward_send(request.sender_id, connection_id, request.payload.clone()) - }); + let unshared_project = state.update_project( + project_id, + &request.payload.worktrees, + request.payload.online, + request.sender_id, + )?; + + if let Some(unshared_project) = unshared_project { + broadcast( + request.sender_id, + unshared_project.guests.keys().copied(), + |conn_id| { + self.peer.send( + conn_id, + proto::UnregisterProject { + project_id: project_id.to_proto(), + }, + ) + }, + ); + for (_, receipts) in unshared_project.pending_join_requests { + for receipt in receipts { + self.peer.respond( + receipt, + proto::JoinProjectResponse { + variant: Some(proto::join_project_response::Variant::Decline( + proto::join_project_response::Decline { + reason: + proto::join_project_response::decline::Reason::Closed + as i32, + }, + )), + }, + )?; + } + } + } else { + broadcast(request.sender_id, guest_connection_ids, |connection_id| { + self.peer.forward_send( + request.sender_id, + connection_id, + request.payload.clone(), + ) + }); + } }; + self.update_user_contacts(user_id).await?; Ok(()) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d1eb4a3be6d50a20c993e7b9cadacfd5de7e459d..2e69a97b27f260cb9730add613d957246a9d900c 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -32,6 +32,7 @@ struct ConnectionState { #[derive(Serialize)] pub struct Project { + pub online: bool, pub host_connection_id: ConnectionId, pub host: Collaborator, pub guests: HashMap, @@ -88,6 +89,11 @@ pub struct LeftProject { pub unshare: bool, } +pub struct UnsharedProject { + pub guests: HashMap, + pub pending_join_requests: HashMap>>, +} + #[derive(Copy, Clone)] pub struct Metrics { pub connections: usize, @@ -297,7 +303,7 @@ impl Store { let mut metadata = Vec::new(); for project_id in project_ids { if let Some(project) = self.projects.get(&project_id) { - if project.host.user_id == user_id { + if project.host.user_id == user_id && project.online { metadata.push(proto::ProjectMetadata { id: project_id.to_proto(), visible_worktree_root_names: project @@ -323,6 +329,7 @@ impl Store { &mut self, host_connection_id: ConnectionId, project_id: ProjectId, + online: bool, ) -> Result<()> { let connection = self .connections @@ -332,6 +339,7 @@ impl Store { self.projects.insert( project_id, Project { + online, host_connection_id, host: Collaborator { user_id: connection.user_id, @@ -353,8 +361,9 @@ impl Store { &mut self, project_id: ProjectId, worktrees: &[proto::WorktreeMetadata], + online: bool, connection_id: ConnectionId, - ) -> Result<()> { + ) -> Result> { let project = self .projects .get_mut(&project_id) @@ -375,7 +384,34 @@ impl Store { ); } } - Ok(()) + + if online != project.online { + project.online = online; + if project.online { + Ok(None) + } else { + for connection_id in project.guest_connection_ids() { + if let Some(connection) = self.connections.get_mut(&connection_id) { + connection.projects.remove(&project_id); + } + } + + project.active_replica_ids.clear(); + project.language_servers.clear(); + for worktree in project.worktrees.values_mut() { + worktree.diagnostic_summaries.clear(); + worktree.entries.clear(); + worktree.extension_counts.clear(); + } + + Ok(Some(UnsharedProject { + guests: mem::take(&mut project.guests), + pending_join_requests: mem::take(&mut project.join_requests), + })) + } + } else { + Ok(None) + } } else { Err(anyhow!("no such project"))? } @@ -481,13 +517,17 @@ impl Store { .projects .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; - connection.requested_projects.insert(project_id); - project - .join_requests - .entry(requester_id) - .or_default() - .push(receipt); - Ok(()) + if project.online { + connection.requested_projects.insert(project_id); + project + .join_requests + .entry(requester_id) + .or_default() + .push(receipt); + Ok(()) + } else { + Err(anyhow!("no such project")) + } } pub fn deny_join_project_request( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ce07a6abdf2a654008dc218826ea2b4a64e7896..4e3f78333091723cb8631a1aa1713ed238880df6 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -136,13 +136,14 @@ enum ProjectClientState { remote_id_rx: watch::Receiver>, online_tx: watch::Sender, online_rx: watch::Receiver, - _maintain_remote_id_task: Task>, + _maintain_remote_id: Task>, + _maintain_online_status: Task>, }, Remote { sharing_has_stopped: bool, remote_id: u64, replica_id: ReplicaId, - _detect_unshare_task: Task>, + _detect_unshare: Task>, }, } @@ -381,17 +382,13 @@ impl Project { cx: &mut MutableAppContext, ) -> ModelHandle { cx.add_model(|cx: &mut ModelContext| { - let (online_tx, online_rx) = watch::channel_with(online); let (remote_id_tx, remote_id_rx) = watch::channel(); - let _maintain_remote_id_task = cx.spawn_weak({ - let status_rx = client.clone().status(); - let online_rx = online_rx.clone(); + let _maintain_remote_id = cx.spawn_weak({ + let mut status_rx = client.clone().status(); move |this, mut cx| async move { - let mut stream = Stream::map(status_rx.clone(), drop) - .merge(Stream::map(online_rx.clone(), drop)); - while stream.recv().await.is_some() { + while let Some(status) = status_rx.recv().await { let this = this.upgrade(&cx)?; - if status_rx.borrow().is_connected() && *online_rx.borrow() { + if status.is_connected() { this.update(&mut cx, |this, cx| this.register(cx)) .await .log_err()?; @@ -405,6 +402,23 @@ impl Project { } }); + let (online_tx, online_rx) = watch::channel_with(online); + let _maintain_online_status = cx.spawn_weak({ + let mut online_rx = online_rx.clone(); + move |this, mut cx| async move { + while online_rx.recv().await.is_some() { + let this = this.upgrade(&cx)?; + this.update(&mut cx, |this, cx| { + if !this.is_online() { + this.unshared(cx); + } + this.metadata_changed(false, cx) + }); + } + None + } + }); + let handle = cx.weak_handle(); project_store.update(cx, |store, cx| store.add_project(handle, cx)); @@ -423,7 +437,8 @@ impl Project { remote_id_rx, online_tx, online_rx, - _maintain_remote_id_task, + _maintain_remote_id, + _maintain_online_status, }, opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), client_subscriptions: Vec::new(), @@ -519,7 +534,7 @@ impl Project { sharing_has_stopped: false, remote_id, replica_id, - _detect_unshare_task: cx.spawn_weak(move |this, mut cx| { + _detect_unshare: cx.spawn_weak(move |this, mut cx| { async move { let mut status = client.status(); let is_connected = @@ -850,27 +865,36 @@ impl Project { } fn register(&mut self, cx: &mut ModelContext) -> Task> { - if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state { + if let ProjectClientState::Local { + remote_id_rx, + online_rx, + .. + } = &self.client_state + { if remote_id_rx.borrow().is_some() { return Task::ready(Ok(())); } - } - let response = self.client.request(proto::RegisterProject {}); - cx.spawn(|this, mut cx| async move { - let remote_id = response.await?.project_id; - this.update(&mut cx, |this, cx| { - if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state { - *remote_id_tx.borrow_mut() = Some(remote_id); - } + let response = self.client.request(proto::RegisterProject { + online: *online_rx.borrow(), + }); + cx.spawn(|this, mut cx| async move { + let remote_id = response.await?.project_id; + this.update(&mut cx, |this, cx| { + if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state { + *remote_id_tx.borrow_mut() = Some(remote_id); + } - this.metadata_changed(false, cx); - cx.emit(Event::RemoteIdChanged(Some(remote_id))); - this.client_subscriptions - .push(this.client.add_model_for_remote_entity(remote_id, cx)); - Ok(()) + this.metadata_changed(false, cx); + cx.emit(Event::RemoteIdChanged(Some(remote_id))); + this.client_subscriptions + .push(this.client.add_model_for_remote_entity(remote_id, cx)); + Ok(()) + }) }) - }) + } else { + Task::ready(Err(anyhow!("can't register a remote project"))) + } } pub fn remote_id(&self) -> Option { @@ -934,19 +958,25 @@ impl Project { .. } = &self.client_state { - if let (Some(project_id), true) = (*remote_id_rx.borrow(), *online_rx.borrow()) { + // Broadcast worktrees only if the project is public. + let worktrees = if *online_rx.borrow() { + self.worktrees + .iter() + .filter_map(|worktree| { + worktree + .upgrade(&cx) + .map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto()) + }) + .collect() + } else { + Default::default() + }; + if let Some(project_id) = *remote_id_rx.borrow() { self.client .send(proto::UpdateProject { project_id, - worktrees: self - .worktrees - .iter() - .filter_map(|worktree| { - worktree.upgrade(&cx).map(|worktree| { - worktree.read(cx).as_local().unwrap().metadata_proto() - }) - }) - .collect(), + worktrees, + online: *online_rx.borrow(), }) .log_err(); } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 69ccae1704ebb05b87dad60a5bd32c9f00432fa3..1cfbb6cea4448aedc7e40f14d806902a56c463ba 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -121,7 +121,9 @@ message Test { uint64 id = 1; } -message RegisterProject {} +message RegisterProject { + bool online = 1; +} message RegisterProjectResponse { uint64 project_id = 1; @@ -134,6 +136,7 @@ message UnregisterProject { message UpdateProject { uint64 project_id = 1; repeated WorktreeMetadata worktrees = 2; + bool online = 3; } message RegisterProjectActivity { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index ecee3709863b4239dcd43b3c2e9c21eb4868880c..8b7c5e302bba2021e2a06473decb063d9bb8bfbd 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -130,9 +130,9 @@ messages!( (PrepareRename, Background), (PrepareRenameResponse, Background), (ProjectEntryResponse, Foreground), + (ProjectUnshared, Foreground), (RegisterProjectResponse, Foreground), (Ping, Foreground), - (ProjectUnshared, Foreground), (RegisterProject, Foreground), (RegisterProjectActivity, Foreground), (ReloadBuffers, Foreground), diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index d7d40e81e809040453aef53ea01c9d9554bb71dc..5fff19bb065b95b2da5eb3be25f579fcd41df1fa 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 26; +pub const PROTOCOL_VERSION: u32 = 27; From 09f4262fd4b450d7f6b3b9f8284cc0481c065e55 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 15:16:55 +0200 Subject: [PATCH 06/13] Don't share a project unless it's online and we're allowing a request --- crates/collab/src/rpc/store.rs | 4 ++++ crates/project/src/project.rs | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 2e69a97b27f260cb9730add613d957246a9d900c..7a327e4d35690ce1281a7ff3754cdddbd4ffb188 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -657,6 +657,10 @@ impl Store { scan_id: u64, ) -> Result<(Vec, bool, HashMap)> { let project = self.write_project(project_id, connection_id)?; + if !project.online { + return Err(anyhow!("project is not online")); + } + let connection_ids = project.connection_ids(); let mut worktree = project.worktrees.entry(worktree_id).or_default(); let metadata_changed = worktree_root_name != worktree.root_name; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 4e3f78333091723cb8631a1aa1713ed238880df6..2658adaa6d176575682b3d9bdddfdc0422520573 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1243,6 +1243,10 @@ impl Project { } fn share(&mut self, cx: &mut ModelContext) -> Task> { + if !self.is_online() { + return Task::ready(Err(anyhow!("can't share an offline project"))); + } + let project_id; if let ProjectClientState::Local { remote_id_rx, @@ -1358,11 +1362,17 @@ impl Project { cx: &mut ModelContext, ) { if let Some(project_id) = self.remote_id() { - let share = self.share(cx); + let share = if self.is_online() && allow { + Some(self.share(cx)) + } else { + None + }; let client = self.client.clone(); cx.foreground() .spawn(async move { - share.await?; + if let Some(share) = share { + share.await?; + } client.send(proto::RespondToJoinProjectRequest { requester_id, project_id, From f9e0fec3969e1ee41360952a129eef925fe2a821 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 16:38:24 +0200 Subject: [PATCH 07/13] Maintain extension counts on local worktrees Co-Authored-By: Nathan Sobo --- crates/project/src/worktree.rs | 46 ++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 472dacc0ea59e77151b3cf50b16f8c1f58569fee..2184f4c1cdf30bee7c96a1bf00d78c4771ef96fc 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -105,6 +105,7 @@ pub struct LocalSnapshot { removed_entry_ids: HashMap, next_entry_id: Arc, snapshot: Snapshot, + extension_counts: HashMap, } impl Deref for LocalSnapshot { @@ -450,6 +451,7 @@ impl LocalWorktree { entries_by_id: Default::default(), scan_id: 0, }, + extension_counts: Default::default(), }; if let Some(metadata) = metadata { let entry = Entry::new( @@ -1438,7 +1440,7 @@ impl LocalSnapshot { self.reuse_entry_id(&mut entry); self.entries_by_path.insert_or_replace(entry.clone(), &()); let scan_id = self.scan_id; - self.entries_by_id.insert_or_replace( + let removed_entry = self.entries_by_id.insert_or_replace( PathEntry { id: entry.id, path: entry.path.clone(), @@ -1447,6 +1449,12 @@ impl LocalSnapshot { }, &(), ); + + if let Some(removed_entry) = removed_entry { + self.dec_extension_count(&removed_entry.path); + } + self.inc_extension_count(&entry.path); + entry } @@ -1482,6 +1490,7 @@ impl LocalSnapshot { for mut entry in entries { self.reuse_entry_id(&mut entry); + self.inc_extension_count(&entry.path); entries_by_id_edits.push(Edit::Insert(PathEntry { id: entry.id, path: entry.path.clone(), @@ -1492,7 +1501,29 @@ impl LocalSnapshot { } self.entries_by_path.edit(entries_by_path_edits, &()); - self.entries_by_id.edit(entries_by_id_edits, &()); + let removed_entries = self.entries_by_id.edit(entries_by_id_edits, &()); + + for removed_entry in removed_entries { + self.dec_extension_count(&removed_entry.path); + } + } + + fn inc_extension_count(&mut self, path: &Path) { + if let Some(extension) = path.extension() { + if let Some(count) = self.extension_counts.get_mut(extension) { + *count += 1; + } else { + self.extension_counts.insert(extension.into(), 1); + } + } + } + + fn dec_extension_count(&mut self, path: &Path) { + if let Some(extension) = path.extension() { + if let Some(count) = self.extension_counts.get_mut(extension) { + *count -= 1; + } + } } fn reuse_entry_id(&mut self, entry: &mut Entry) { @@ -1522,6 +1553,7 @@ impl LocalSnapshot { .or_insert(entry.id); *removed_entry_id = cmp::max(*removed_entry_id, entry.id); entries_by_id_edits.push(Edit::Remove(entry.id)); + self.dec_extension_count(&entry.path); } self.entries_by_id.edit(entries_by_id_edits, &()); @@ -2932,6 +2964,7 @@ mod tests { root_char_bag: Default::default(), scan_id: 0, }, + extension_counts: Default::default(), }; initial_snapshot.insert_entry( Entry::new( @@ -3211,6 +3244,15 @@ mod tests { .entry_for_path(ignore_parent_path.join(&*GITIGNORE)) .is_some()); } + + // Ensure extension counts are correct. + let mut expected_extension_counts = HashMap::default(); + for extension in self.entries(false).filter_map(|e| e.path.extension()) { + *expected_extension_counts + .entry(extension.into()) + .or_insert(0) += 1; + } + assert_eq!(self.extension_counts, expected_extension_counts); } fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> { From 639cd71a3b3d49e7850d0ca2c9ffcbf978b011f3 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 16:58:19 +0200 Subject: [PATCH 08/13] Record worktree extensions every 5 minutes Co-Authored-By: Nathan Sobo --- crates/collab/src/db.rs | 8 +- crates/collab/src/rpc.rs | 30 +++++-- crates/collab/src/rpc/store.rs | 57 ++------------ crates/project/src/project.rs | 49 +++++++++++- crates/project/src/worktree.rs | 4 + crates/rpc/proto/zed.proto | 140 +++++++++++++++++---------------- crates/rpc/src/proto.rs | 2 + 7 files changed, 158 insertions(+), 132 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index ef91bb03acbd66a179efab706682a7af6e6976ea..09e6245e4388348eb27ee630f22187d9d1b8c592 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -52,7 +52,7 @@ pub trait Db: Send + Sync { &self, project_id: ProjectId, worktree_id: u64, - extensions: HashMap, + extensions: HashMap, ) -> Result<()>; /// Get the file counts on the given project keyed by their worktree and extension. @@ -506,7 +506,7 @@ impl Db for PostgresDb { &self, project_id: ProjectId, worktree_id: u64, - extensions: HashMap, + extensions: HashMap, ) -> Result<()> { if extensions.is_empty() { return Ok(()); @@ -2255,7 +2255,7 @@ pub mod tests { background: Arc, pub users: Mutex>, pub projects: Mutex>, - pub worktree_extensions: Mutex>, + pub worktree_extensions: Mutex>, pub orgs: Mutex>, pub org_memberships: Mutex>, pub channels: Mutex>, @@ -2442,7 +2442,7 @@ pub mod tests { &self, project_id: ProjectId, worktree_id: u64, - extensions: HashMap, + extensions: HashMap, ) -> Result<()> { self.background.simulate_random_delay().await; if !self.projects.lock().contains_key(&project_id) { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 1aa2e5c4af4105edf797e4067ae3bd9a9221830a..2a5aeb459efecd1b97d5c2d7d12498d6cd0bfcb9 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -164,6 +164,7 @@ impl Server { .add_message_handler(Server::update_project) .add_message_handler(Server::register_project_activity) .add_request_handler(Server::update_worktree) + .add_message_handler(Server::update_worktree_extensions) .add_message_handler(Server::start_language_server) .add_message_handler(Server::update_language_server) .add_message_handler(Server::update_diagnostic_summary) @@ -996,9 +997,9 @@ impl Server { ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let worktree_id = request.payload.worktree_id; - let (connection_ids, metadata_changed, extension_counts) = { + let (connection_ids, metadata_changed) = { let mut store = self.store_mut().await; - let (connection_ids, metadata_changed, extension_counts) = store.update_worktree( + let (connection_ids, metadata_changed) = store.update_worktree( request.sender_id, project_id, worktree_id, @@ -1007,12 +1008,8 @@ impl Server { &request.payload.updated_entries, request.payload.scan_id, )?; - (connection_ids, metadata_changed, extension_counts.clone()) + (connection_ids, metadata_changed) }; - self.app_state - .db - .update_worktree_extensions(project_id, worktree_id, extension_counts) - .await?; broadcast(request.sender_id, connection_ids, |connection_id| { self.peer @@ -1029,6 +1026,25 @@ impl Server { Ok(()) } + async fn update_worktree_extensions( + self: Arc, + request: TypedEnvelope, + ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); + let worktree_id = request.payload.worktree_id; + let extensions = request + .payload + .extensions + .into_iter() + .zip(request.payload.counts) + .collect(); + self.app_state + .db + .update_worktree_extensions(project_id, worktree_id, extensions) + .await?; + Ok(()) + } + async fn update_diagnostic_summary( self: Arc, request: TypedEnvelope, diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 7a327e4d35690ce1281a7ff3754cdddbd4ffb188..2ae7036ccbb7b19565e9ea048f8485b234fe4d37 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -3,12 +3,7 @@ use anyhow::{anyhow, Result}; use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}; use rpc::{proto, ConnectionId, Receipt}; use serde::Serialize; -use std::{ - mem, - path::{Path, PathBuf}, - str, - time::Duration, -}; +use std::{mem, path::PathBuf, str, time::Duration}; use time::OffsetDateTime; use tracing::instrument; @@ -59,8 +54,6 @@ pub struct Worktree { #[serde(skip)] pub entries: BTreeMap, #[serde(skip)] - pub extension_counts: HashMap, - #[serde(skip)] pub diagnostic_summaries: BTreeMap, pub scan_id: u64, } @@ -401,7 +394,6 @@ impl Store { for worktree in project.worktrees.values_mut() { worktree.diagnostic_summaries.clear(); worktree.entries.clear(); - worktree.extension_counts.clear(); } Ok(Some(UnsharedProject { @@ -632,7 +624,6 @@ impl Store { for worktree in project.worktrees.values_mut() { worktree.diagnostic_summaries.clear(); worktree.entries.clear(); - worktree.extension_counts.clear(); } } @@ -655,7 +646,7 @@ impl Store { removed_entries: &[u64], updated_entries: &[proto::Entry], scan_id: u64, - ) -> Result<(Vec, bool, HashMap)> { + ) -> Result<(Vec, bool)> { let project = self.write_project(project_id, connection_id)?; if !project.online { return Err(anyhow!("project is not online")); @@ -667,45 +658,15 @@ impl Store { worktree.root_name = worktree_root_name.to_string(); for entry_id in removed_entries { - if let Some(entry) = worktree.entries.remove(&entry_id) { - if !entry.is_ignored { - if let Some(extension) = extension_for_entry(&entry) { - if let Some(count) = worktree.extension_counts.get_mut(extension) { - *count = count.saturating_sub(1); - } - } - } - } + worktree.entries.remove(&entry_id); } for entry in updated_entries { - if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) { - if !old_entry.is_ignored { - if let Some(extension) = extension_for_entry(&old_entry) { - if let Some(count) = worktree.extension_counts.get_mut(extension) { - *count = count.saturating_sub(1); - } - } - } - } - - if !entry.is_ignored { - if let Some(extension) = extension_for_entry(&entry) { - if let Some(count) = worktree.extension_counts.get_mut(extension) { - *count += 1; - } else { - worktree.extension_counts.insert(extension.into(), 1); - } - } - } + worktree.entries.insert(entry.id, entry.clone()); } worktree.scan_id = scan_id; - Ok(( - connection_ids, - metadata_changed, - worktree.extension_counts.clone(), - )) + Ok((connection_ids, metadata_changed)) } pub fn project_connection_ids( @@ -894,11 +855,3 @@ impl Channel { self.connection_ids.iter().copied().collect() } } - -fn extension_for_entry(entry: &proto::Entry) -> Option<&str> { - str::from_utf8(&entry.path) - .ok() - .map(Path::new) - .and_then(|p| p.extension()) - .and_then(|e| e.to_str()) -} diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 2658adaa6d176575682b3d9bdddfdc0422520573..520d743f46aa4f6c63af60786d87befbfbb9cd11 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -52,7 +52,7 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, }, - time::Instant, + time::{Duration, Instant}, }; use thiserror::Error; use util::{post_inc, ResultExt, TryFutureExt as _}; @@ -403,13 +403,22 @@ impl Project { }); let (online_tx, online_rx) = watch::channel_with(online); + let mut send_extension_counts = None; let _maintain_online_status = cx.spawn_weak({ let mut online_rx = online_rx.clone(); move |this, mut cx| async move { - while online_rx.recv().await.is_some() { + while let Some(online) = online_rx.recv().await { let this = this.upgrade(&cx)?; + if online { + send_extension_counts = Some( + this.update(&mut cx, |this, cx| this.send_extension_counts(cx)), + ); + } else { + send_extension_counts.take(); + } + this.update(&mut cx, |this, cx| { - if !this.is_online() { + if !online { this.unshared(cx); } this.metadata_changed(false, cx) @@ -463,6 +472,40 @@ impl Project { }) } + fn send_extension_counts(&self, cx: &mut ModelContext) -> Task> { + cx.spawn_weak(|this, cx| async move { + loop { + let this = this.upgrade(&cx)?; + this.read_with(&cx, |this, cx| { + if let Some(project_id) = this.remote_id() { + for worktree in this.visible_worktrees(cx) { + if let Some(worktree) = worktree.read(cx).as_local() { + let mut extensions = Vec::new(); + let mut counts = Vec::new(); + + for (extension, count) in worktree.extension_counts() { + extensions.push(extension.to_string_lossy().to_string()); + counts.push(*count as u32); + } + + this.client + .send(proto::UpdateWorktreeExtensions { + project_id, + worktree_id: worktree.id().to_proto(), + extensions, + counts, + }) + .log_err(); + } + } + } + }); + + cx.background().timer(Duration::from_secs(60 * 5)).await; + } + }) + } + pub async fn remote( remote_id: u64, client: Arc, diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 2184f4c1cdf30bee7c96a1bf00d78c4771ef96fc..5900c9b079b65fcd035088d360bbe20c376addab 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1327,6 +1327,10 @@ impl LocalSnapshot { &self.abs_path } + pub fn extension_counts(&self) -> &HashMap { + &self.extension_counts + } + #[cfg(test)] pub(crate) fn to_proto( &self, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 1cfbb6cea4448aedc7e40f14d806902a56c463ba..10810d7a6b4371d5b0a6131931b93dc9373de77c 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -38,72 +38,73 @@ message Envelope { UpdateProject update_project = 30; RegisterProjectActivity register_project_activity = 31; UpdateWorktree update_worktree = 32; - - CreateProjectEntry create_project_entry = 33; - RenameProjectEntry rename_project_entry = 34; - CopyProjectEntry copy_project_entry = 35; - DeleteProjectEntry delete_project_entry = 36; - ProjectEntryResponse project_entry_response = 37; - - UpdateDiagnosticSummary update_diagnostic_summary = 38; - StartLanguageServer start_language_server = 39; - UpdateLanguageServer update_language_server = 40; - - OpenBufferById open_buffer_by_id = 41; - OpenBufferByPath open_buffer_by_path = 42; - OpenBufferResponse open_buffer_response = 43; - UpdateBuffer update_buffer = 44; - UpdateBufferFile update_buffer_file = 45; - SaveBuffer save_buffer = 46; - BufferSaved buffer_saved = 47; - BufferReloaded buffer_reloaded = 48; - ReloadBuffers reload_buffers = 49; - ReloadBuffersResponse reload_buffers_response = 50; - FormatBuffers format_buffers = 51; - FormatBuffersResponse format_buffers_response = 52; - GetCompletions get_completions = 53; - GetCompletionsResponse get_completions_response = 54; - ApplyCompletionAdditionalEdits apply_completion_additional_edits = 55; - ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 56; - GetCodeActions get_code_actions = 57; - GetCodeActionsResponse get_code_actions_response = 58; - GetHover get_hover = 59; - GetHoverResponse get_hover_response = 60; - ApplyCodeAction apply_code_action = 61; - ApplyCodeActionResponse apply_code_action_response = 62; - PrepareRename prepare_rename = 63; - PrepareRenameResponse prepare_rename_response = 64; - PerformRename perform_rename = 65; - PerformRenameResponse perform_rename_response = 66; - SearchProject search_project = 67; - SearchProjectResponse search_project_response = 68; - - GetChannels get_channels = 69; - GetChannelsResponse get_channels_response = 70; - JoinChannel join_channel = 71; - JoinChannelResponse join_channel_response = 72; - LeaveChannel leave_channel = 73; - SendChannelMessage send_channel_message = 74; - SendChannelMessageResponse send_channel_message_response = 75; - ChannelMessageSent channel_message_sent = 76; - GetChannelMessages get_channel_messages = 77; - GetChannelMessagesResponse get_channel_messages_response = 78; - - UpdateContacts update_contacts = 79; - UpdateInviteInfo update_invite_info = 80; - ShowContacts show_contacts = 81; - - GetUsers get_users = 82; - FuzzySearchUsers fuzzy_search_users = 83; - UsersResponse users_response = 84; - RequestContact request_contact = 85; - RespondToContactRequest respond_to_contact_request = 86; - RemoveContact remove_contact = 87; - - Follow follow = 88; - FollowResponse follow_response = 89; - UpdateFollowers update_followers = 90; - Unfollow unfollow = 91; + UpdateWorktreeExtensions update_worktree_extensions = 33; + + CreateProjectEntry create_project_entry = 34; + RenameProjectEntry rename_project_entry = 35; + CopyProjectEntry copy_project_entry = 36; + DeleteProjectEntry delete_project_entry = 37; + ProjectEntryResponse project_entry_response = 38; + + UpdateDiagnosticSummary update_diagnostic_summary = 39; + StartLanguageServer start_language_server = 40; + UpdateLanguageServer update_language_server = 41; + + OpenBufferById open_buffer_by_id = 42; + OpenBufferByPath open_buffer_by_path = 43; + OpenBufferResponse open_buffer_response = 44; + UpdateBuffer update_buffer = 45; + UpdateBufferFile update_buffer_file = 46; + SaveBuffer save_buffer = 47; + BufferSaved buffer_saved = 48; + BufferReloaded buffer_reloaded = 49; + ReloadBuffers reload_buffers = 50; + ReloadBuffersResponse reload_buffers_response = 51; + FormatBuffers format_buffers = 52; + FormatBuffersResponse format_buffers_response = 53; + GetCompletions get_completions = 54; + GetCompletionsResponse get_completions_response = 55; + ApplyCompletionAdditionalEdits apply_completion_additional_edits = 56; + ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 57; + GetCodeActions get_code_actions = 58; + GetCodeActionsResponse get_code_actions_response = 59; + GetHover get_hover = 60; + GetHoverResponse get_hover_response = 61; + ApplyCodeAction apply_code_action = 62; + ApplyCodeActionResponse apply_code_action_response = 63; + PrepareRename prepare_rename = 64; + PrepareRenameResponse prepare_rename_response = 65; + PerformRename perform_rename = 66; + PerformRenameResponse perform_rename_response = 67; + SearchProject search_project = 68; + SearchProjectResponse search_project_response = 69; + + GetChannels get_channels = 70; + GetChannelsResponse get_channels_response = 71; + JoinChannel join_channel = 72; + JoinChannelResponse join_channel_response = 73; + LeaveChannel leave_channel = 74; + SendChannelMessage send_channel_message = 75; + SendChannelMessageResponse send_channel_message_response = 76; + ChannelMessageSent channel_message_sent = 77; + GetChannelMessages get_channel_messages = 78; + GetChannelMessagesResponse get_channel_messages_response = 79; + + UpdateContacts update_contacts = 80; + UpdateInviteInfo update_invite_info = 81; + ShowContacts show_contacts = 82; + + GetUsers get_users = 83; + FuzzySearchUsers fuzzy_search_users = 84; + UsersResponse users_response = 85; + RequestContact request_contact = 86; + RespondToContactRequest respond_to_contact_request = 87; + RemoveContact remove_contact = 88; + + Follow follow = 89; + FollowResponse follow_response = 90; + UpdateFollowers update_followers = 91; + Unfollow unfollow = 92; } } @@ -200,6 +201,13 @@ message UpdateWorktree { uint64 scan_id = 6; } +message UpdateWorktreeExtensions { + uint64 project_id = 1; + uint64 worktree_id = 2; + repeated string extensions = 3; + repeated uint32 counts = 4; +} + message CreateProjectEntry { uint64 project_id = 1; uint64 worktree_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 8b7c5e302bba2021e2a06473decb063d9bb8bfbd..9429d9a6eb3561e55b5571cc46d091e7cbb145c5 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -162,6 +162,7 @@ messages!( (UpdateLanguageServer, Foreground), (UpdateProject, Foreground), (UpdateWorktree, Foreground), + (UpdateWorktreeExtensions, Background), ); request_messages!( @@ -254,6 +255,7 @@ entity_messages!( UpdateLanguageServer, UpdateProject, UpdateWorktree, + UpdateWorktreeExtensions, ); entity_messages!(channel_id, ChannelMessageSent); From d1cdacdf149b9569bd461042f950f5ee71f5fb03 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 17:00:16 +0200 Subject: [PATCH 09/13] Skip ignored entries when recording worktree extensions Co-Authored-By: Nathan Sobo --- crates/project/src/worktree.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 5900c9b079b65fcd035088d360bbe20c376addab..4d2b8631f6773b401df547e7ea9cbef0e75c615b 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1455,9 +1455,9 @@ impl LocalSnapshot { ); if let Some(removed_entry) = removed_entry { - self.dec_extension_count(&removed_entry.path); + self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored); } - self.inc_extension_count(&entry.path); + self.inc_extension_count(&entry.path, entry.is_ignored); entry } @@ -1494,7 +1494,7 @@ impl LocalSnapshot { for mut entry in entries { self.reuse_entry_id(&mut entry); - self.inc_extension_count(&entry.path); + self.inc_extension_count(&entry.path, entry.is_ignored); entries_by_id_edits.push(Edit::Insert(PathEntry { id: entry.id, path: entry.path.clone(), @@ -1508,24 +1508,28 @@ impl LocalSnapshot { let removed_entries = self.entries_by_id.edit(entries_by_id_edits, &()); for removed_entry in removed_entries { - self.dec_extension_count(&removed_entry.path); + self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored); } } - fn inc_extension_count(&mut self, path: &Path) { - if let Some(extension) = path.extension() { - if let Some(count) = self.extension_counts.get_mut(extension) { - *count += 1; - } else { - self.extension_counts.insert(extension.into(), 1); + fn inc_extension_count(&mut self, path: &Path, ignored: bool) { + if !ignored { + if let Some(extension) = path.extension() { + if let Some(count) = self.extension_counts.get_mut(extension) { + *count += 1; + } else { + self.extension_counts.insert(extension.into(), 1); + } } } } - fn dec_extension_count(&mut self, path: &Path) { - if let Some(extension) = path.extension() { - if let Some(count) = self.extension_counts.get_mut(extension) { - *count -= 1; + fn dec_extension_count(&mut self, path: &Path, ignored: bool) { + if !ignored { + if let Some(extension) = path.extension() { + if let Some(count) = self.extension_counts.get_mut(extension) { + *count -= 1; + } } } } @@ -1557,7 +1561,7 @@ impl LocalSnapshot { .or_insert(entry.id); *removed_entry_id = cmp::max(*removed_entry_id, entry.id); entries_by_id_edits.push(Edit::Remove(entry.id)); - self.dec_extension_count(&entry.path); + self.dec_extension_count(&entry.path, entry.is_ignored); } self.entries_by_id.edit(entries_by_id_edits, &()); From 7bae759a0212c0f97084ee3d78fce3c6ef0d23ff Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 17:58:11 +0200 Subject: [PATCH 10/13] Send extension counts when metadata changes Co-Authored-By: Nathan Sobo --- crates/gpui/src/app.rs | 2 +- crates/project/src/lsp_command.rs | 2 +- crates/project/src/project.rs | 72 ++++++++++++------------------- crates/project/src/worktree.rs | 19 ++++++++ 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 5fa55448084368dc1f7b99fe119afaf645ff4f5d..20ec9c6db09cde4a6f8aae30bbc6c1c267c231ff 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -592,7 +592,7 @@ impl AsyncAppContext { self.0.borrow().foreground.spawn(f(self.clone())) } - pub fn read T>(&mut self, callback: F) -> T { + pub fn read T>(&self, callback: F) -> T { callback(self.0.borrow().as_ref()) } diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index ee2bf37aa1f3e5574e1fc2231e92b515b7ffa85e..3f4cd3450f680fbe83a617aa21817289b097e9c1 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -890,7 +890,7 @@ impl LspCommand for GetHover { message: Option, _: ModelHandle, buffer: ModelHandle, - mut cx: AsyncAppContext, + cx: AsyncAppContext, ) -> Result { Ok(message.and_then(|hover| { let range = hover.range.map(|range| { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 520d743f46aa4f6c63af60786d87befbfbb9cd11..99a5c655333b32a61be79bd65d12a1ff703a84d9 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -52,7 +52,7 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, }, - time::{Duration, Instant}, + time::Instant, }; use thiserror::Error; use util::{post_inc, ResultExt, TryFutureExt as _}; @@ -403,20 +403,11 @@ impl Project { }); let (online_tx, online_rx) = watch::channel_with(online); - let mut send_extension_counts = None; let _maintain_online_status = cx.spawn_weak({ let mut online_rx = online_rx.clone(); move |this, mut cx| async move { while let Some(online) = online_rx.recv().await { let this = this.upgrade(&cx)?; - if online { - send_extension_counts = Some( - this.update(&mut cx, |this, cx| this.send_extension_counts(cx)), - ); - } else { - send_extension_counts.take(); - } - this.update(&mut cx, |this, cx| { if !online { this.unshared(cx); @@ -472,40 +463,6 @@ impl Project { }) } - fn send_extension_counts(&self, cx: &mut ModelContext) -> Task> { - cx.spawn_weak(|this, cx| async move { - loop { - let this = this.upgrade(&cx)?; - this.read_with(&cx, |this, cx| { - if let Some(project_id) = this.remote_id() { - for worktree in this.visible_worktrees(cx) { - if let Some(worktree) = worktree.read(cx).as_local() { - let mut extensions = Vec::new(); - let mut counts = Vec::new(); - - for (extension, count) in worktree.extension_counts() { - extensions.push(extension.to_string_lossy().to_string()); - counts.push(*count as u32); - } - - this.client - .send(proto::UpdateWorktreeExtensions { - project_id, - worktree_id: worktree.id().to_proto(), - extensions, - counts, - }) - .log_err(); - } - } - } - }); - - cx.background().timer(Duration::from_secs(60 * 5)).await; - } - }) - } - pub async fn remote( remote_id: u64, client: Arc, @@ -1015,13 +972,38 @@ impl Project { Default::default() }; if let Some(project_id) = *remote_id_rx.borrow() { + let online = *online_rx.borrow(); self.client .send(proto::UpdateProject { project_id, worktrees, - online: *online_rx.borrow(), + online, }) .log_err(); + + if online { + let worktrees = self.visible_worktrees(cx).collect::>(); + let scans_complete = + futures::future::join_all(worktrees.iter().filter_map(|worktree| { + Some(worktree.read(cx).as_local()?.scan_complete()) + })); + + let worktrees = worktrees.into_iter().map(|handle| handle.downgrade()); + cx.spawn_weak(move |_, cx| async move { + scans_complete.await; + cx.read(|cx| { + for worktree in worktrees { + if let Some(worktree) = worktree + .upgrade(cx) + .and_then(|worktree| worktree.read(cx).as_local()) + { + worktree.send_extension_counts(project_id); + } + } + }) + }) + .detach(); + } } self.project_store.update(cx, |_, cx| cx.notify()); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 4d2b8631f6773b401df547e7ea9cbef0e75c615b..9ad45751c308da40c2f4778b758ff7dab7aade88 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1037,6 +1037,25 @@ impl LocalWorktree { } } } + + pub fn send_extension_counts(&self, project_id: u64) { + let mut extensions = Vec::new(); + let mut counts = Vec::new(); + + for (extension, count) in self.extension_counts() { + extensions.push(extension.to_string_lossy().to_string()); + counts.push(*count as u32); + } + + self.client + .send(proto::UpdateWorktreeExtensions { + project_id, + worktree_id: self.id().to_proto(), + extensions, + counts, + }) + .log_err(); + } } impl RemoteWorktree { From 5eaa45363d703f6c12021de709aac6f06a22555d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 29 Jun 2022 18:28:43 +0200 Subject: [PATCH 11/13] Order by total duration in `Db::get_top_users_activity_summary` Co-Authored-By: Max Brunsfeld --- crates/collab/src/db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 09e6245e4388348eb27ee630f22187d9d1b8c592..6c4b37870cb01d8d9a20a26447e45bce3d534fc1 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -618,7 +618,7 @@ impl Db for PostgresDb { WHERE user_durations.user_id = project_durations.user_id AND user_durations.user_id = users.id - ORDER BY user_id ASC, project_duration DESC + ORDER BY total_duration DESC, user_id ASC "; let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query) @@ -1625,8 +1625,8 @@ pub mod tests { id: user_1, github_login: "user_1".to_string(), project_activity: vec![ + (project_1, Duration::from_secs(25)), (project_2, Duration::from_secs(30)), - (project_1, Duration::from_secs(25)) ] }, UserActivitySummary { From b5d862abfe80ae771e49e0231ad2a1c9d03a1f18 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 29 Jun 2022 17:58:02 -0700 Subject: [PATCH 12/13] Only send one UpdateProject msg when changing project's online status --- crates/project/src/project.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 99a5c655333b32a61be79bd65d12a1ff703a84d9..e1a7129ab0668e7428aaa08d427f99a187da2e83 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -808,13 +808,11 @@ impl Project { &self.fs } - pub fn set_online(&mut self, online: bool, cx: &mut ModelContext) { + pub fn set_online(&mut self, online: bool, _: &mut ModelContext) { if let ProjectClientState::Local { online_tx, .. } = &mut self.client_state { let mut online_tx = online_tx.borrow_mut(); if *online_tx != online { *online_tx = online; - drop(online_tx); - self.metadata_changed(true, cx); } } } @@ -958,7 +956,7 @@ impl Project { .. } = &self.client_state { - // Broadcast worktrees only if the project is public. + // Broadcast worktrees only if the project is online. let worktrees = if *online_rx.borrow() { self.worktrees .iter() From 336d69fc61abec93e749c444b30013ff28d26a95 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 29 Jun 2022 17:58:18 -0700 Subject: [PATCH 13/13] Update contacts panel test to reflect new RPC message flow --- crates/contacts_panel/src/contacts_panel.rs | 108 +++++++++----------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/crates/contacts_panel/src/contacts_panel.rs b/crates/contacts_panel/src/contacts_panel.rs index 1384774cc04754e7debcda6337ac99a1f58be61d..3197e464311cd06c1c41e06f9d9c1766c6538e67 100644 --- a/crates/contacts_panel/src/contacts_panel.rs +++ b/crates/contacts_panel/src/contacts_panel.rs @@ -1260,6 +1260,13 @@ mod tests { .detach(); }); + let request = server.receive::().await.unwrap(); + server + .respond( + request.receipt(), + proto::RegisterProjectResponse { project_id: 200 }, + ) + .await; let get_users_request = server.receive::().await.unwrap(); server .respond( @@ -1337,6 +1344,19 @@ mod tests { ..Default::default() }); + assert_eq!( + server + .receive::() + .await + .unwrap() + .payload, + proto::UpdateProject { + project_id: 200, + online: false, + worktrees: vec![] + }, + ); + cx.foreground().run_until_parked(); assert_eq!( cx.read(|cx| render_to_strings(&panel, cx)), @@ -1380,36 +1400,6 @@ mod tests { ] ); - // The server responds, assigning the project a remote id. It still appears - // as loading, because the server hasn't yet sent out the updated contact - // state for the current user. - let request = server.receive::().await.unwrap(); - server - .respond( - request.receipt(), - proto::RegisterProjectResponse { project_id: 200 }, - ) - .await; - cx.foreground().run_until_parked(); - assert_eq!( - cx.read(|cx| render_to_strings(&panel, cx)), - &[ - "v Requests", - " incoming user_one", - " outgoing user_two", - "v Online", - " the_current_user", - " dir3", - " 🔒 private_dir (going online...)", - " user_four", - " dir2", - " user_three", - " dir1", - "v Offline", - " user_five", - ] - ); - // The server receives the project's metadata and updates the contact metadata // for the current user. Now the project appears as online. assert_eq!( @@ -1417,14 +1407,22 @@ mod tests { .receive::() .await .unwrap() - .payload - .worktrees, - &[proto::WorktreeMetadata { - id: worktree_id, - root_name: "private_dir".to_string(), - visible: true, - }], + .payload, + proto::UpdateProject { + project_id: 200, + online: true, + worktrees: vec![proto::WorktreeMetadata { + id: worktree_id, + root_name: "private_dir".to_string(), + visible: true, + }] + }, ); + server + .receive::() + .await + .unwrap(); + server.send(proto::UpdateContacts { contacts: vec![proto::Contact { user_id: current_user_id, @@ -1489,7 +1487,19 @@ mod tests { // The server receives the unregister request and updates the contact // metadata for the current user. The project is now offline. - let request = server.receive::().await.unwrap(); + assert_eq!( + server + .receive::() + .await + .unwrap() + .payload, + proto::UpdateProject { + project_id: 200, + online: false, + worktrees: vec![] + }, + ); + server.send(proto::UpdateContacts { contacts: vec![proto::Contact { user_id: current_user_id, @@ -1523,28 +1533,6 @@ mod tests { ] ); - // The server responds to the unregister request. - server.respond(request.receipt(), proto::Ack {}).await; - cx.foreground().run_until_parked(); - assert_eq!( - cx.read(|cx| render_to_strings(&panel, cx)), - &[ - "v Requests", - " incoming user_one", - " outgoing user_two", - "v Online", - " the_current_user", - " dir3", - " 🔒 private_dir", - " user_four", - " dir2", - " user_three", - " dir1", - "v Offline", - " user_five", - ] - ); - panel.update(cx, |panel, cx| { panel .filter_editor