diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 803fbb906adc53ac03cb1826a1f139931a83f8e1..c63b2e0f5bbd67109e25bbd46ea9962570ccf4e5 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -94,12 +94,18 @@ impl ActiveCall { async fn handle_call_canceled( this: ModelHandle, - _: TypedEnvelope, + envelope: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, _| { - *this.incoming_call.0.borrow_mut() = None; + let mut incoming_call = this.incoming_call.0.borrow_mut(); + if incoming_call + .as_ref() + .map_or(false, |call| call.room_id == envelope.payload.room_id) + { + incoming_call.take(); + } }); Ok(()) } diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 824ec49054c8f1721a834cc0de46e2426636dc7f..8deb4341180afa02c39dc096f78b95714d93784a 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -15,7 +15,7 @@ use project::Project; use std::{mem, sync::Arc, time::Duration}; use util::{post_inc, ResultExt}; -pub const RECONNECTION_TIMEOUT: Duration = client::RECEIVE_TIMEOUT; +pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT; #[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { @@ -50,7 +50,7 @@ pub struct Room { user_store: ModelHandle, subscriptions: Vec, pending_room_update: Option>, - _maintain_connection: Task>, + maintain_connection: Option>>, } impl Entity for Room { @@ -121,7 +121,7 @@ impl Room { None }; - let _maintain_connection = + let maintain_connection = cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx)); Self { @@ -138,7 +138,7 @@ impl Room { pending_room_update: None, client, user_store, - _maintain_connection, + maintain_connection: Some(maintain_connection), } } @@ -235,6 +235,8 @@ impl Room { self.participant_user_ids.clear(); self.subscriptions.clear(); self.live_kit.take(); + self.pending_room_update.take(); + self.maintain_connection.take(); self.client.send(proto::LeaveRoom {})?; Ok(()) } @@ -262,45 +264,52 @@ impl Room { }); // Wait for client to re-establish a connection to the server. - let mut reconnection_timeout = cx.background().timer(RECONNECTION_TIMEOUT).fuse(); - let client_reconnection = async { - loop { - if let Some(status) = client_status.next().await { - if status.is_connected() { - return true; + { + let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse(); + let client_reconnection = async { + let mut remaining_attempts = 3; + while remaining_attempts > 0 { + if let Some(status) = client_status.next().await { + if status.is_connected() { + let rejoin_room = async { + let response = + client.request(proto::JoinRoom { id: room_id }).await?; + let room_proto = + response.room.ok_or_else(|| anyhow!("invalid room"))?; + this.upgrade(&cx) + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + this.status = RoomStatus::Online; + this.apply_room_update(room_proto, cx) + })?; + anyhow::Ok(()) + }; + + if rejoin_room.await.is_ok() { + return true; + } else { + remaining_attempts -= 1; + } + } + } else { + return false; } - } else { - return false; } + false } - } - .fuse(); - futures::pin_mut!(client_reconnection); - - futures::select_biased! { - reconnected = client_reconnection => { - if reconnected { - // Client managed to reconnect to the server. Now attempt to join the room. - let rejoin_room = async { - let response = client.request(proto::JoinRoom { id: room_id }).await?; - let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; - this.upgrade(&cx) - .ok_or_else(|| anyhow!("room was dropped"))? - .update(&mut cx, |this, cx| { - this.status = RoomStatus::Online; - this.apply_room_update(room_proto, cx) - })?; - anyhow::Ok(()) - }; - - // If we successfully joined the room, go back around the loop - // waiting for future connection status changes. - if rejoin_room.await.log_err().is_some() { + .fuse(); + futures::pin_mut!(client_reconnection); + + futures::select_biased! { + reconnected = client_reconnection => { + if reconnected { + // If we successfully joined the room, go back around the loop + // waiting for future connection status changes. continue; } } + _ = reconnection_timeout => {} } - _ = reconnection_timeout => {} } // The client failed to re-establish a connection to the server diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 0d4bcac5ddfba2c8836294e382a84782f4f5a55e..c0cc5b3457e5b279a6f211ec7e9f2b9d30823868 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -129,6 +129,7 @@ CREATE TABLE "room_participants" ( "calling_connection_epoch" TEXT NOT NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); +CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch"); CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch"); CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); diff --git a/crates/collab/migrations/20221213125710_index_room_participants_on_room_id.sql b/crates/collab/migrations/20221213125710_index_room_participants_on_room_id.sql new file mode 100644 index 0000000000000000000000000000000000000000..f40ca81906f41e8c30530ce349f892bb69111657 --- /dev/null +++ b/crates/collab/migrations/20221213125710_index_room_participants_on_room_id.sql @@ -0,0 +1 @@ +CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 4a920841e8138e027770d3146ad92dc2c8ea4b53..999c87f9ee38f74597f92148ccba40c548a27ce1 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -21,6 +21,7 @@ use dashmap::DashMap; use futures::StreamExt; use hyper::StatusCode; use rpc::{proto, ConnectionId}; +use sea_orm::Condition; pub use sea_orm::ConnectOptions; use sea_orm::{ entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction, @@ -47,7 +48,7 @@ pub struct Database { background: Option>, #[cfg(test)] runtime: Option, - epoch: Uuid, + epoch: parking_lot::RwLock, } impl Database { @@ -60,10 +61,20 @@ impl Database { background: None, #[cfg(test)] runtime: None, - epoch: Uuid::new_v4(), + epoch: parking_lot::RwLock::new(Uuid::new_v4()), }) } + #[cfg(test)] + pub fn reset(&self) { + self.rooms.clear(); + *self.epoch.write() = Uuid::new_v4(); + } + + fn epoch(&self) -> Uuid { + *self.epoch.read() + } + pub async fn migrate( &self, migrations_path: &Path, @@ -105,37 +116,85 @@ impl Database { Ok(new_migrations) } - pub async fn clear_stale_data(&self) -> Result<()> { + pub async fn delete_stale_projects(&self) -> Result<()> { self.transaction(|tx| async move { project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch)) - .exec(&*tx) - .await?; - room_participant::Entity::delete_many() - .filter( - room_participant::Column::AnsweringConnectionEpoch - .ne(self.epoch) - .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)), - ) + .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch())) .exec(&*tx) .await?; project::Entity::delete_many() - .filter(project::Column::HostConnectionEpoch.ne(self.epoch)) + .filter(project::Column::HostConnectionEpoch.ne(self.epoch())) .exec(&*tx) .await?; - room::Entity::delete_many() - .filter( - room::Column::Id.not_in_subquery( - Query::select() - .column(room_participant::Column::RoomId) - .from(room_participant::Entity) - .distinct() - .to_owned(), - ), - ) + Ok(()) + }) + .await + } + + pub async fn outdated_room_ids(&self) -> Result> { + self.transaction(|tx| async move { + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + RoomId, + } + + Ok(room_participant::Entity::find() + .select_only() + .column(room_participant::Column::RoomId) + .distinct() + .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())) + .into_values::<_, QueryAs>() + .all(&*tx) + .await?) + }) + .await + } + + pub async fn refresh_room(&self, room_id: RoomId) -> Result> { + self.room_transaction(|tx| async move { + let stale_participant_filter = Condition::all() + .add(room_participant::Column::RoomId.eq(room_id)) + .add(room_participant::Column::AnsweringConnectionId.is_not_null()) + .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())); + + let stale_participant_user_ids = room_participant::Entity::find() + .filter(stale_participant_filter.clone()) + .all(&*tx) + .await? + .into_iter() + .map(|participant| participant.user_id) + .collect::>(); + + // Delete participants who failed to reconnect. + room_participant::Entity::delete_many() + .filter(stale_participant_filter) .exec(&*tx) .await?; - Ok(()) + + let room = self.get_room(room_id, &tx).await?; + let mut canceled_calls_to_user_ids = Vec::new(); + // Delete the room if it becomes empty and cancel pending calls. + if room.participants.is_empty() { + canceled_calls_to_user_ids.extend( + room.pending_participants + .iter() + .map(|pending_participant| UserId::from_proto(pending_participant.user_id)), + ); + room_participant::Entity::delete_many() + .filter(room_participant::Column::RoomId.eq(room_id)) + .exec(&*tx) + .await?; + room::Entity::delete_by_id(room_id).exec(&*tx).await?; + } + + Ok(( + room_id, + RefreshedRoom { + room, + stale_participant_user_ids, + canceled_calls_to_user_ids, + }, + )) }) .await } @@ -1033,11 +1092,11 @@ impl Database { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch)), + answering_connection_epoch: ActiveValue::set(Some(self.epoch())), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), calling_connection_id: ActiveValue::set(connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch), + calling_connection_epoch: ActiveValue::set(self.epoch()), ..Default::default() } .insert(&*tx) @@ -1064,7 +1123,7 @@ impl Database { answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch), + calling_connection_epoch: ActiveValue::set(self.epoch()), initial_project_id: ActiveValue::set(initial_project_id), ..Default::default() } @@ -1174,18 +1233,22 @@ impl Database { self.room_transaction(|tx| async move { let result = room_participant::Entity::update_many() .filter( - room_participant::Column::RoomId - .eq(room_id) - .and(room_participant::Column::UserId.eq(user_id)) - .and( - room_participant::Column::AnsweringConnectionId - .is_null() - .or(room_participant::Column::AnsweringConnectionLost.eq(true)), + Condition::all() + .add(room_participant::Column::RoomId.eq(room_id)) + .add(room_participant::Column::UserId.eq(user_id)) + .add( + Condition::any() + .add(room_participant::Column::AnsweringConnectionId.is_null()) + .add(room_participant::Column::AnsweringConnectionLost.eq(true)) + .add( + room_participant::Column::AnsweringConnectionEpoch + .ne(self.epoch()), + ), ), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch)), + answering_connection_epoch: ActiveValue::set(Some(self.epoch())), answering_connection_lost: ActiveValue::set(false), ..Default::default() }) @@ -1591,7 +1654,7 @@ impl Database { room_id: ActiveValue::set(participant.room_id), host_user_id: ActiveValue::set(participant.user_id), host_connection_id: ActiveValue::set(connection_id.0 as i32), - host_connection_epoch: ActiveValue::set(self.epoch), + host_connection_epoch: ActiveValue::set(self.epoch()), ..Default::default() } .insert(&*tx) @@ -1616,7 +1679,7 @@ impl Database { project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch), + connection_epoch: ActiveValue::set(self.epoch()), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(ReplicaId(0)), is_host: ActiveValue::set(true), @@ -1930,7 +1993,7 @@ impl Database { let new_collaborator = project_collaborator::ActiveModel { project_id: ActiveValue::set(project_id), connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch), + connection_epoch: ActiveValue::set(self.epoch()), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(replica_id), is_host: ActiveValue::set(false), @@ -2553,6 +2616,12 @@ pub struct LeftRoom { pub canceled_calls_to_user_ids: Vec, } +pub struct RefreshedRoom { + pub room: proto::Room, + pub stale_participant_user_ids: Vec, + pub canceled_calls_to_user_ids: Vec, +} + pub struct Project { pub collaborators: Vec, pub worktrees: BTreeMap, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index aca5f77fe98bde707e042f7756ebebcbf3070b73..8150b1c0af53563d068603f73434f120560b536c 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -4,7 +4,6 @@ use crate::{ rpc::{Server, RECONNECT_TIMEOUT}, AppState, }; -use ::rpc::Peer; use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{ @@ -19,10 +18,8 @@ use editor::{ use fs::{FakeFs, Fs as _, HomeDir, LineEnding}; use futures::{channel::oneshot, StreamExt as _}; use gpui::{ - executor::{self, Deterministic}, - geometry::vector::vec2f, - test::EmptyView, - ModelHandle, Task, TestAppContext, ViewHandle, + executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, Task, + TestAppContext, ViewHandle, }; use language::{ range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, @@ -37,7 +34,7 @@ use serde_json::json; use settings::{Formatter, Settings}; use std::{ cell::{Cell, RefCell}, - env, mem, + env, future, mem, ops::Deref, path::{Path, PathBuf}, rc::Rc, @@ -67,7 +64,7 @@ async fn test_basic_calls( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -266,7 +263,7 @@ async fn test_room_uniqueness( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let _client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -365,13 +362,13 @@ async fn test_room_uniqueness( } #[gpui::test(iterations = 10)] -async fn test_disconnecting_from_room( +async fn test_client_disconnecting_from_room( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -516,6 +513,303 @@ async fn test_disconnecting_from_room( ); } +#[gpui::test(iterations = 10)] +async fn test_server_restarts( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_d: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + let client_d = server.create_client(cx_d, "user_d").await; + server + .make_contacts(&mut [ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + (&client_d, cx_d), + ]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + let active_call_d = cx_d.read(ActiveCall::global); + + // User A calls users B, C, and D. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + + // User B receives the call and joins the room. + let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); + assert!(incoming_call_b.next().await.unwrap().is_some()); + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + + // User C receives the call and joins the room. + let mut incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming()); + assert!(incoming_call_c.next().await.unwrap().is_some()); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + + // User D receives the call but doesn't join the room yet. + let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming()); + assert!(incoming_call_d.next().await.unwrap().is_some()); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: vec!["user_d".to_string()] + } + ); + + // The server is torn down. + server.teardown(); + + // Users A and B reconnect to the call. User C has troubles reconnecting, so it leaves the room. + client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + deterministic.advance_clock(RECEIVE_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + + // User D is notified again of the incoming call and accepts it. + assert!(incoming_call_d.next().await.unwrap().is_some()); + active_call_d + .update(cx_d, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + deterministic.run_until_parked(); + let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone()); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_c".to_string(), + ], + pending: vec![] + } + ); + + // The server finishes restarting, cleaning up stale connections. + server.start().await.unwrap(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_d".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_d".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: vec![] + } + ); + + // User D hangs up. + active_call_d + .update(cx_d, |call, cx| call.hang_up(cx)) + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + + // User B calls user D again. + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + + // User D receives the call but doesn't join the room yet. + let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming()); + assert!(incoming_call_d.next().await.unwrap().is_some()); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec!["user_d".to_string()] + } + ); + + // The server is torn down. + server.teardown(); + + // Users A and B have troubles reconnecting, so they leave the room. + client_a.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + client_b.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + deterministic.advance_clock(RECEIVE_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + + // User D is notified again of the incoming call but doesn't accept it. + assert!(incoming_call_d.next().await.unwrap().is_some()); + + // The server finishes restarting, cleaning up stale connections and canceling the + // call to user D because the room has become empty. + server.start().await.unwrap(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + assert!(incoming_call_d.next().await.unwrap().is_none()); +} + #[gpui::test(iterations = 10)] async fn test_calls_on_multiple_connections( deterministic: Arc, @@ -524,7 +818,7 @@ async fn test_calls_on_multiple_connections( cx_b2: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b1 = server.create_client(cx_b1, "user_b").await; let client_b2 = server.create_client(cx_b2, "user_b").await; @@ -676,7 +970,7 @@ async fn test_share_project( ) { deterministic.forbid_parking(); let (_, window_b) = cx_b.add_window(|_| EmptyView); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -813,7 +1107,7 @@ async fn test_unshare_project( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -896,7 +1190,7 @@ async fn test_host_disconnect( ) { cx_b.update(editor::init); deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -1014,7 +1308,7 @@ async fn test_active_call_events( cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; client_a.fs.insert_tree("/a", json!({})).await; @@ -1103,7 +1397,7 @@ async fn test_room_location( cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; client_a.fs.insert_tree("/a", json!({})).await; @@ -1269,7 +1563,7 @@ async fn test_propagate_saves_and_fs_changes( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -1439,12 +1733,12 @@ async fn test_propagate_saves_and_fs_changes( #[gpui::test(iterations = 10)] async fn test_git_diff_base_change( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - executor.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -1513,7 +1807,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait for it to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_local_a.read_with(cx_a, |buffer, _| { @@ -1533,7 +1827,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait remote buffer to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_remote_a.read_with(cx_b, |buffer, _| { @@ -1556,7 +1850,7 @@ async fn test_git_diff_base_change( .await; // Wait for buffer_local_a to receive it - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test new diffing buffer_local_a.read_with(cx_a, |buffer, _| { @@ -1611,7 +1905,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait for it to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_local_b.read_with(cx_a, |buffer, _| { @@ -1631,7 +1925,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait remote buffer to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_remote_b.read_with(cx_b, |buffer, _| { @@ -1654,7 +1948,7 @@ async fn test_git_diff_base_change( .await; // Wait for buffer_local_b to receive it - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test new diffing buffer_local_b.read_with(cx_a, |buffer, _| { @@ -1691,12 +1985,12 @@ async fn test_git_diff_base_change( #[gpui::test(iterations = 10)] async fn test_fs_operations( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - executor.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -1960,9 +2254,13 @@ async fn test_fs_operations( } #[gpui::test(iterations = 10)] -async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_buffer_conflict_after_save( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2014,9 +2312,13 @@ async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut T } #[gpui::test(iterations = 10)] -async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_buffer_reloading( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2071,11 +2373,12 @@ async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont #[gpui::test(iterations = 10)] async fn test_editing_while_guest_opens_buffer( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2118,11 +2421,12 @@ async fn test_editing_while_guest_opens_buffer( #[gpui::test(iterations = 10)] async fn test_leaving_worktree_while_opening_buffer( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2167,7 +2471,7 @@ async fn test_canceling_buffer_opening( ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2218,7 +2522,7 @@ async fn test_leaving_project( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -2329,7 +2633,7 @@ async fn test_leaving_project( // Simulate connection loss for client C and ensure client A observes client C leaving the project. client_c.wait_for_current_user(cx_c).await; server.disconnect_client(client_c.peer_id().unwrap()); - cx_a.foreground().advance_clock(RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| { assert_eq!(project.collaborators().len(), 0); @@ -2350,7 +2654,7 @@ async fn test_collaborating_with_diagnostics( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -2610,9 +2914,13 @@ async fn test_collaborating_with_diagnostics( } #[gpui::test(iterations = 10)] -async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_collaborating_with_completion( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2784,9 +3092,13 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu } #[gpui::test(iterations = 10)] -async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_reloading_buffer_manually( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2876,10 +3188,14 @@ async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut Te } #[gpui::test(iterations = 10)] -async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { +async fn test_formatting_buffer( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { use project::FormatTrigger; - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2978,9 +3294,13 @@ async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppCon } #[gpui::test(iterations = 10)] -async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_definition( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3122,9 +3442,13 @@ async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_references( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3223,9 +3547,13 @@ async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_project_search( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3302,9 +3630,13 @@ async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContex } #[gpui::test(iterations = 10)] -async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_document_highlights( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3404,9 +3736,13 @@ async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppC } #[gpui::test(iterations = 10)] -async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_lsp_hover( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3507,9 +3843,13 @@ async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_project_symbols( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3612,12 +3952,13 @@ async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppConte #[gpui::test(iterations = 10)] async fn test_open_buffer_while_getting_definition_pointing_to_it( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, mut rng: StdRng, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3688,12 +4029,13 @@ async fn test_open_buffer_while_getting_definition_pointing_to_it( #[gpui::test(iterations = 10)] async fn test_collaborating_with_code_actions( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3908,10 +4250,14 @@ async fn test_collaborating_with_code_actions( } #[gpui::test(iterations = 10)] -async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_collaborating_with_renames( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -4110,7 +4456,7 @@ async fn test_language_server_statuses( deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -4222,8 +4568,8 @@ async fn test_contacts( cx_c: &mut TestAppContext, cx_d: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -4610,7 +4956,7 @@ async fn test_contacts( #[gpui::test(iterations = 10)] async fn test_contact_requests( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_a2: &mut TestAppContext, cx_b: &mut TestAppContext, @@ -4618,10 +4964,10 @@ async fn test_contact_requests( cx_c: &mut TestAppContext, cx_c2: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); // Connect to a server as 3 clients. - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -4648,7 +4994,7 @@ async fn test_contact_requests( }) .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // All users see the pending request appear in all their clients. assert_eq!( @@ -4680,7 +5026,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!( client_a.summarize_contacts(cx_a).outgoing_requests, &["user_b"] @@ -4703,7 +5049,7 @@ async fn test_contact_requests( .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // User B sees user A as their contact now in all client, and the incoming request from them is removed. let contacts_b = client_b.summarize_contacts(cx_b); @@ -4725,7 +5071,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]); assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]); assert_eq!( @@ -4747,7 +5093,7 @@ async fn test_contact_requests( .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // User B doesn't see user C as their contact, and the incoming request from them is removed. let contacts_b = client_b.summarize_contacts(cx_b); @@ -4769,7 +5115,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]); assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]); assert!(client_b @@ -4798,11 +5144,11 @@ async fn test_following( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5079,7 +5425,7 @@ async fn test_following_tab_order( cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5194,12 +5540,16 @@ async fn test_following_tab_order( } #[gpui::test(iterations = 10)] -async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_peers_following_each_other( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5371,13 +5721,17 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T } #[gpui::test(iterations = 10)] -async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_auto_unfollowing( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); // 2 clients connect to a server. - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5551,7 +5905,7 @@ async fn test_peers_simultaneously_following_each_other( cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5621,7 +5975,7 @@ async fn test_random_collaboration( .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) .unwrap_or(10); - let mut server = TestServer::start(cx.background()).await; + let mut server = TestServer::start(&deterministic).await; let db = server.app_state.db.clone(); let mut available_guests = Vec::new(); @@ -5709,7 +6063,6 @@ async fn test_random_collaboration( let user_connection_ids = server .connection_pool .lock() - .await .user_connection_ids(removed_guest_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); @@ -5730,7 +6083,7 @@ async fn test_random_collaboration( } for user_id in &user_ids { let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap(); - let pool = server.connection_pool.lock().await; + let pool = server.connection_pool.lock(); for contact in contacts { if let db::Contact::Accepted { user_id, .. } = contact { if pool.is_user_online(user_id) { @@ -5759,7 +6112,6 @@ async fn test_random_collaboration( let user_connection_ids = server .connection_pool .lock() - .await .user_connection_ids(user_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); @@ -5768,6 +6120,13 @@ async fn test_random_collaboration( deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); operations += 1; } + 30..=34 => { + log::info!("Simulating server restart"); + server.teardown(); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + server.start().await.unwrap(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + } _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { op_start_signals @@ -5933,7 +6292,6 @@ async fn test_random_collaboration( } struct TestServer { - peer: Arc, app_state: Arc, server: Arc, connection_killers: Arc>>>, @@ -5943,29 +6301,33 @@ struct TestServer { } impl TestServer { - async fn start(background: Arc) -> Self { + async fn start(deterministic: &Arc) -> Self { static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0); let use_postgres = env::var("USE_POSTGRES").ok(); let use_postgres = use_postgres.as_deref(); let test_db = if use_postgres == Some("true") || use_postgres == Some("1") { - TestDb::postgres(background.clone()) + TestDb::postgres(deterministic.build_background()) } else { - TestDb::sqlite(background.clone()) + TestDb::sqlite(deterministic.build_background()) }; let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst); let live_kit_server = live_kit_client::TestServer::create( format!("http://livekit.{}.test", live_kit_server_id), format!("devkey-{}", live_kit_server_id), format!("secret-{}", live_kit_server_id), - background.clone(), + deterministic.build_background(), ) .unwrap(); let app_state = Self::build_app_state(&test_db, &live_kit_server).await; - let peer = Peer::new(); - let server = Server::new(app_state.clone()); + let server = Server::new( + app_state.clone(), + Executor::Deterministic(deterministic.build_background()), + ); + server.start().await.unwrap(); + // Advance clock to ensure the server's cleanup task is finished. + deterministic.advance_clock(RECONNECT_TIMEOUT); Self { - peer, app_state, server, connection_killers: Default::default(), @@ -5975,6 +6337,11 @@ impl TestServer { } } + fn teardown(&self) { + self.server.teardown(); + self.app_state.db.reset(); + } + async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { cx.update(|cx| { cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf())); @@ -6192,7 +6559,6 @@ impl Deref for TestServer { impl Drop for TestServer { fn drop(&mut self) { - self.peer.reset(); self.server.teardown(); self.test_live_kit_server.teardown().unwrap(); } @@ -6896,16 +7262,19 @@ struct RoomParticipants { } fn room_participants(room: &ModelHandle, cx: &mut TestAppContext) -> RoomParticipants { - room.read_with(cx, |room, _| RoomParticipants { - remote: room + room.read_with(cx, |room, _| { + let mut remote = room .remote_participants() .iter() .map(|(_, participant)| participant.user.github_login.clone()) - .collect(), - pending: room + .collect::>(); + let mut pending = room .pending_participants() .iter() .map(|user| user.github_login.clone()) - .collect(), + .collect::>(); + remote.sort(); + pending.sort(); + RoomParticipants { remote, pending } }) } diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index b9d43cd2eef9542ca65c02ed4681c465a7731175..7e0f23f5d4096cbf33aaf8ea6275381b96f9208a 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -2,7 +2,7 @@ pub mod api; pub mod auth; pub mod db; pub mod env; -mod executor; +pub mod executor; #[cfg(test)] mod integration_tests; pub mod rpc; diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index a288e0f3ce83fe8c7a0656f108f15c6088021d68..710910fe033bdca14993e60a986fde578c9bb59a 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use axum::{routing::get, Router}; -use collab::{db, env, AppState, Config, MigrateConfig, Result}; +use collab::{db, env, executor::Executor, AppState, Config, MigrateConfig, Result}; use db::Database; use std::{ env::args, @@ -52,12 +52,12 @@ async fn main() -> Result<()> { init_tracing(&config); let state = AppState::new(config).await?; - state.db.clear_stale_data().await?; let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)) .expect("failed to bind TCP listener"); - let rpc_server = collab::rpc::Server::new(state.clone()); + let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production); + rpc_server.start().await?; let app = collab::api::routes(rpc_server.clone(), state.clone()) .merge(collab::rpc::routes(rpc_server.clone())) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index a799837ad4fe68f9002f232e7acf69dcf15637bc..89aec5bb05fb188ccc4601abc9b65a56eebc855e 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,7 +53,7 @@ use std::{ }, time::Duration, }; -use tokio::sync::{watch, Mutex, MutexGuard}; +use tokio::sync::watch; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -90,14 +90,14 @@ impl Response { struct Session { user_id: UserId, connection_id: ConnectionId, - db: Arc>, + db: Arc>, peer: Arc, - connection_pool: Arc>, + connection_pool: Arc>, live_kit_client: Option>, } impl Session { - async fn db(&self) -> MutexGuard { + async fn db(&self) -> tokio::sync::MutexGuard { #[cfg(test)] tokio::task::yield_now().await; let guard = self.db.lock().await; @@ -109,9 +109,7 @@ impl Session { async fn connection_pool(&self) -> ConnectionPoolGuard<'_> { #[cfg(test)] tokio::task::yield_now().await; - let guard = self.connection_pool.lock().await; - #[cfg(test)] - tokio::task::yield_now().await; + let guard = self.connection_pool.lock(); ConnectionPoolGuard { guard, _not_send: PhantomData, @@ -140,14 +138,15 @@ impl Deref for DbHandle { pub struct Server { peer: Arc, - pub(crate) connection_pool: Arc>, + pub(crate) connection_pool: Arc>, app_state: Arc, + executor: Executor, handlers: HashMap, teardown: watch::Sender<()>, } pub(crate) struct ConnectionPoolGuard<'a> { - guard: MutexGuard<'a, ConnectionPool>, + guard: parking_lot::MutexGuard<'a, ConnectionPool>, _not_send: PhantomData>, } @@ -168,10 +167,11 @@ where } impl Server { - pub fn new(app_state: Arc) -> Arc { + pub fn new(app_state: Arc, executor: Executor) -> Arc { let mut server = Self { peer: Peer::new(), app_state, + executor, connection_pool: Default::default(), handlers: Default::default(), teardown: watch::channel(()).0, @@ -237,7 +237,96 @@ impl Server { Arc::new(server) } + pub async fn start(&self) -> Result<()> { + self.app_state.db.delete_stale_projects().await?; + let db = self.app_state.db.clone(); + let peer = self.peer.clone(); + let timeout = self.executor.sleep(RECONNECT_TIMEOUT); + let pool = self.connection_pool.clone(); + let live_kit_client = self.app_state.live_kit_client.clone(); + self.executor.spawn_detached(async move { + timeout.await; + if let Some(room_ids) = db.outdated_room_ids().await.trace_err() { + for room_id in room_ids { + let mut contacts_to_update = HashSet::default(); + let mut canceled_calls_to_user_ids = Vec::new(); + let mut live_kit_room = String::new(); + let mut delete_live_kit_room = false; + + if let Ok(mut refreshed_room) = db.refresh_room(room_id).await { + room_updated(&refreshed_room.room, &peer); + contacts_to_update + .extend(refreshed_room.stale_participant_user_ids.iter().copied()); + contacts_to_update + .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); + canceled_calls_to_user_ids = + mem::take(&mut refreshed_room.canceled_calls_to_user_ids); + live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); + delete_live_kit_room = refreshed_room.room.participants.is_empty(); + } + + { + let pool = pool.lock(); + for canceled_user_id in canceled_calls_to_user_ids { + for connection_id in pool.user_connection_ids(canceled_user_id) { + peer.send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) + .trace_err(); + } + } + } + + for user_id in contacts_to_update { + let busy = db.is_user_busy(user_id).await.trace_err(); + let contacts = db.get_contacts(user_id).await.trace_err(); + if let Some((busy, contacts)) = busy.zip(contacts) { + let pool = pool.lock(); + let updated_contact = contact_for_user(user_id, false, busy, &pool); + for contact in contacts { + if let db::Contact::Accepted { + user_id: contact_user_id, + .. + } = contact + { + for contact_conn_id in pool.user_connection_ids(contact_user_id) + { + peer.send( + contact_conn_id, + proto::UpdateContacts { + contacts: vec![updated_contact.clone()], + remove_contacts: Default::default(), + incoming_requests: Default::default(), + remove_incoming_requests: Default::default(), + outgoing_requests: Default::default(), + remove_outgoing_requests: Default::default(), + }, + ) + .trace_err(); + } + } + } + } + } + + if let Some(live_kit) = live_kit_client.as_ref() { + if delete_live_kit_room { + live_kit.delete_room(live_kit_room).await.trace_err(); + } + } + } + } + }); + Ok(()) + } + + #[cfg(test)] pub fn teardown(&self) { + self.peer.reset(); + self.connection_pool.lock().reset(); let _ = self.teardown.send(()); } @@ -339,7 +428,7 @@ impl Server { let user_id = user.id; let login = user.github_login; let span = info_span!("handle connection", %user_id, %login, %address); - let teardown = self.teardown.subscribe(); + let mut teardown = self.teardown.subscribe(); async move { let (connection_id, handle_io, mut incoming_rx) = this .peer @@ -367,7 +456,7 @@ impl Server { ).await?; { - let mut pool = this.connection_pool.lock().await; + let mut pool = this.connection_pool.lock(); pool.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?; @@ -386,7 +475,7 @@ impl Server { let session = Session { user_id, connection_id, - db: Arc::new(Mutex::new(DbHandle(this.app_state.db.clone()))), + db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))), peer: this.peer.clone(), connection_pool: this.connection_pool.clone(), live_kit_client: this.app_state.live_kit_client.clone() @@ -409,6 +498,7 @@ impl Server { let next_message = incoming_rx.next().fuse(); futures::pin_mut!(next_message); futures::select_biased! { + _ = teardown.changed().fuse() => return Ok(()), result = handle_io => { if let Err(error) = result { tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O"); @@ -460,7 +550,7 @@ impl Server { ) -> Result<()> { if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? { if let Some(code) = &user.invite_code { - let pool = self.connection_pool.lock().await; + let pool = self.connection_pool.lock(); let invitee_contact = contact_for_user(invitee_id, true, false, &pool); for connection_id in pool.user_connection_ids(inviter_id) { self.peer.send( @@ -486,7 +576,7 @@ impl Server { pub async fn invite_count_updated(self: &Arc, user_id: UserId) -> Result<()> { if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? { if let Some(invite_code) = &user.invite_code { - let pool = self.connection_pool.lock().await; + let pool = self.connection_pool.lock(); for connection_id in pool.user_connection_ids(user_id) { self.peer.send( connection_id, @@ -507,7 +597,7 @@ impl Server { pub async fn snapshot<'a>(self: &'a Arc) -> ServerSnapshot<'a> { ServerSnapshot { connection_pool: ConnectionPoolGuard { - guard: self.connection_pool.lock().await, + guard: self.connection_pool.lock(), _not_send: PhantomData, }, peer: &self.peer, @@ -628,7 +718,6 @@ pub async fn handle_metrics(Extension(server): Extension>) -> Result let connections = server .connection_pool .lock() - .await .connections() .filter(|connection| !connection.admin) .count(); @@ -681,7 +770,7 @@ async fn sign_out( { let db = session.db().await; if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { - room_updated(&room, &session); + room_updated(&room, &session.peer); } } update_user_contacts(session.user_id, &session).await?; @@ -749,17 +838,14 @@ async fn join_room( response: Response, session: Session, ) -> Result<()> { + let room_id = RoomId::from_proto(request.id); let room = { let room = session .db() .await - .join_room( - RoomId::from_proto(request.id), - session.user_id, - session.connection_id, - ) + .join_room(room_id, session.user_id, session.connection_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); room.clone() }; @@ -770,7 +856,12 @@ async fn join_room( { session .peer - .send(connection_id, proto::CallCanceled {}) + .send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) .trace_err(); } @@ -834,7 +925,7 @@ async fn call( initial_project_id, ) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); mem::take(incoming_call) }; update_user_contacts(called_user_id, &session).await?; @@ -864,7 +955,7 @@ async fn call( .await .call_failed(room_id, called_user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } update_user_contacts(called_user_id, &session).await?; @@ -884,7 +975,7 @@ async fn cancel_call( .await .cancel_call(Some(room_id), session.connection_id, called_user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } for connection_id in session @@ -894,7 +985,12 @@ async fn cancel_call( { session .peer - .send(connection_id, proto::CallCanceled {}) + .send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) .trace_err(); } response.send(proto::Ack {})?; @@ -911,7 +1007,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( .await .decline_call(Some(room_id), session.user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } for connection_id in session @@ -921,7 +1017,12 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( { session .peer - .send(connection_id, proto::CallCanceled {}) + .send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) .trace_err(); } update_user_contacts(session.user_id, &session).await?; @@ -942,7 +1043,7 @@ async fn update_participant_location( .await .update_room_participant_location(room_id, session.connection_id, location) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) } @@ -964,7 +1065,7 @@ async fn share_project( response.send(proto::ShareProjectResponse { project_id: project_id.to_proto(), })?; - room_updated(&room, &session); + room_updated(&room, &session.peer); Ok(()) } @@ -983,7 +1084,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re guest_connection_ids.iter().copied(), |conn_id| session.peer.send(conn_id, message.clone()), ); - room_updated(&room, &session); + room_updated(&room, &session.peer); Ok(()) } @@ -1142,7 +1243,7 @@ async fn update_project( .forward_send(session.connection_id, connection_id, request.clone()) }, ); - room_updated(&room, &session); + room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) @@ -1789,17 +1890,15 @@ fn contact_for_user( } } -fn room_updated(room: &proto::Room, session: &Session) { +fn room_updated(room: &proto::Room, peer: &Peer) { for participant in &room.participants { - session - .peer - .send( - ConnectionId(participant.peer_id), - proto::RoomUpdated { - room: Some(room.clone()), - }, - ) - .trace_err(); + peer.send( + ConnectionId(participant.peer_id), + proto::RoomUpdated { + room: Some(room.clone()), + }, + ) + .trace_err(); } } @@ -1840,6 +1939,7 @@ async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> async fn leave_room_for_session(session: &Session) -> Result<()> { let mut contacts_to_update = HashSet::default(); + let room_id; let canceled_calls_to_user_ids; let live_kit_room; let delete_live_kit_room; @@ -1851,7 +1951,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { project_left(project, session); } - room_updated(&left_room.room, &session); + room_updated(&left_room.room, &session.peer); + room_id = RoomId::from_proto(left_room.room.id); canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room); delete_live_kit_room = left_room.room.participants.is_empty(); @@ -1863,7 +1964,12 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { for connection_id in pool.user_connection_ids(canceled_user_id) { session .peer - .send(connection_id, proto::CallCanceled {}) + .send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) .trace_err(); } contacts_to_update.insert(canceled_user_id); diff --git a/crates/collab/src/rpc/connection_pool.rs b/crates/collab/src/rpc/connection_pool.rs index ac7632f7da2ae6d4d6beb95aeb298d8e409f8d80..30c4e144ed89ee01898d7d755bc024eef3883db3 100644 --- a/crates/collab/src/rpc/connection_pool.rs +++ b/crates/collab/src/rpc/connection_pool.rs @@ -23,6 +23,11 @@ pub struct Connection { } impl ConnectionPool { + pub fn reset(&mut self) { + self.connections.clear(); + self.connected_users.clear(); + } + #[instrument(skip(self))] pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) { self.connections diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index f7c7bfd6bcc7b5c881fa7b5c8c533419db016d6a..cf58adfe0b04f764af98b65419eaaca8cfaa546e 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -212,7 +212,9 @@ message IncomingCall { optional ParticipantProject initial_project = 4; } -message CallCanceled {} +message CallCanceled { + uint64 room_id = 1; +} message CancelCall { uint64 room_id = 1; diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index 5ca5711d9ca8c43cd5f1979ee76ea11e61053bec..1d4a4496d0af0f1bb9cff6f584c7561ac6fa99e2 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 40; +pub const PROTOCOL_VERSION: u32 = 41;