From ec5ff20b4ca31561c49590f5f61cc65ee5551588 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 5 Sep 2023 11:34:24 -0700 Subject: [PATCH] Implement clearing stale channel buffer participants on server restart Co-authored-by: Mikayla --- crates/channel/src/channel_store.rs | 4 + crates/collab/src/db.rs | 1 + crates/collab/src/db/queries/buffers.rs | 26 ++++- crates/collab/src/db/queries/rooms.rs | 2 +- crates/collab/src/db/queries/servers.rs | 1 + crates/collab/src/rpc.rs | 9 +- .../collab/src/tests/channel_buffer_tests.rs | 96 ++++++++++++++++++- 7 files changed, 133 insertions(+), 6 deletions(-) diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index ec1652581d603e3be510489c3fbc159735a10849..3d2f61d61f88523654812afc4e8f109f5d6a27f6 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -500,6 +500,10 @@ impl ChannelStore { } } + if buffer_versions.is_empty() { + return Task::ready(Ok(())); + } + let response = self.client.request(proto::RejoinChannelBuffers { buffers: buffer_versions, }); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 823990eaf8f2677a2f7a2b67a960d5ba61ff9208..b5d968ddf3b5e0b3302d03e8ad37c73df687d724 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -435,6 +435,7 @@ pub struct ChannelsForUser { pub channels_with_admin_privileges: HashSet, } +#[derive(Debug)] pub struct RejoinedChannelBuffer { pub buffer: proto::RejoinedChannelBuffer, pub old_connection_id: ConnectionId, diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 813255b80e2d55665964fba4e36249a0fec89fbe..8236eb9c3b072a3b082daa1089ea2e00247198b1 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -118,6 +118,7 @@ impl Database { // connection, then the client's buffer can be syncronized with // the server's buffer. if buffer.epoch as u64 != client_buffer.epoch { + log::info!("can't rejoin buffer, epoch has changed"); continue; } @@ -128,6 +129,7 @@ impl Database { c.user_id == user_id && (c.connection_lost || c.connection_server_id != server_id) }) else { + log::info!("can't rejoin buffer, no previous collaborator found"); continue; }; let old_connection_id = self_collaborator.connection(); @@ -196,16 +198,36 @@ impl Database { .await } - pub async fn refresh_channel_buffer( + pub async fn clear_stale_channel_buffer_collaborators( &self, channel_id: ChannelId, server_id: ServerId, ) -> Result { self.transaction(|tx| async move { + let collaborators = channel_buffer_collaborator::Entity::find() + .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)) + .all(&*tx) + .await?; + let mut connection_ids = Vec::new(); let mut removed_collaborators = Vec::new(); + let mut collaborator_ids_to_remove = Vec::new(); + for collaborator in &collaborators { + if !collaborator.connection_lost && collaborator.connection_server_id == server_id { + connection_ids.push(collaborator.connection()); + } else { + removed_collaborators.push(proto::RemoveChannelBufferCollaborator { + channel_id: channel_id.to_proto(), + peer_id: Some(collaborator.connection().into()), + }); + collaborator_ids_to_remove.push(collaborator.id); + } + } - // TODO + channel_buffer_collaborator::Entity::delete_many() + .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove)) + .exec(&*tx) + .await?; Ok(RefreshedChannelBuffer { connection_ids, diff --git a/crates/collab/src/db/queries/rooms.rs b/crates/collab/src/db/queries/rooms.rs index 435e729fed38c2b5fcd4775e0de92ced74ee0b13..e348b50beeedb47c609d455ad759f59adea01adf 100644 --- a/crates/collab/src/db/queries/rooms.rs +++ b/crates/collab/src/db/queries/rooms.rs @@ -1,7 +1,7 @@ use super::*; impl Database { - pub async fn refresh_room( + pub async fn clear_stale_room_participants( &self, room_id: RoomId, new_server_id: ServerId, diff --git a/crates/collab/src/db/queries/servers.rs b/crates/collab/src/db/queries/servers.rs index 2b1d0d2c0c98147769ad5d137665712eb5f51ebd..e5ceee88873e0e89ecf8a29beef587b00c9baaf9 100644 --- a/crates/collab/src/db/queries/servers.rs +++ b/crates/collab/src/db/queries/servers.rs @@ -55,6 +55,7 @@ impl Database { .into_values::<_, QueryChannelIds>() .all(&*tx) .await?; + Ok((room_ids, channel_ids)) }) .await diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 95307ba72529b105789be415b6e850408172db97..e454fcbb9e7a4f2202602ca1ef7947ea6d6b6c9b 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -285,11 +285,15 @@ impl Server { .trace_err() { tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms"); + tracing::info!( + stale_channel_buffer_count = channel_ids.len(), + "retrieved stale channel buffers" + ); for channel_id in channel_ids { if let Some(refreshed_channel_buffer) = app_state .db - .refresh_channel_buffer(channel_id, server_id) + .clear_stale_channel_buffer_collaborators(channel_id, server_id) .await .trace_err() { @@ -309,7 +313,7 @@ impl Server { if let Some(mut refreshed_room) = app_state .db - .refresh_room(room_id, server_id) + .clear_stale_room_participants(room_id, server_id) .await .trace_err() { @@ -873,6 +877,7 @@ async fn connection_lost( futures::select_biased! { _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => { + log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id); leave_room_for_session(&session).await.trace_err(); leave_channel_buffers_for_session(&session) .await diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 236771c2a536656d51095e3f6cf612615835955a..fe286895b4ada34c697a6587b0243c130d0f328e 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -1,4 +1,7 @@ -use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer}; +use crate::{ + rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, + tests::TestServer, +}; use call::ActiveCall; use channel::Channel; use client::UserId; @@ -472,6 +475,97 @@ async fn test_rejoin_channel_buffer( }); } +#[gpui::test] +async fn test_channel_buffers_and_server_restarts( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + + let channel_id = server + .make_channel( + "the-channel", + (&client_a, cx_a), + &mut [(&client_b, cx_b), (&client_c, cx_c)], + ) + .await; + + let channel_buffer_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_buffer(channel_id, cx)) + .await + .unwrap(); + let channel_buffer_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_buffer(channel_id, cx)) + .await + .unwrap(); + let _channel_buffer_c = client_c + .channel_store() + .update(cx_c, |store, cx| store.open_channel_buffer(channel_id, cx)) + .await + .unwrap(); + + channel_buffer_a.update(cx_a, |buffer, cx| { + buffer.buffer().update(cx, |buffer, cx| { + buffer.edit([(0..0, "1")], None, cx); + }) + }); + deterministic.run_until_parked(); + + // Client C can't reconnect. + client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + + // Server stops. + server.reset().await; + deterministic.advance_clock(RECEIVE_TIMEOUT); + + // While the server is down, both clients make an edit. + channel_buffer_a.update(cx_a, |buffer, cx| { + buffer.buffer().update(cx, |buffer, cx| { + buffer.edit([(1..1, "2")], None, cx); + }) + }); + channel_buffer_b.update(cx_b, |buffer, cx| { + buffer.buffer().update(cx, |buffer, cx| { + buffer.edit([(0..0, "0")], None, cx); + }) + }); + + // Server restarts. + server.start().await.unwrap(); + deterministic.advance_clock(CLEANUP_TIMEOUT); + + // Clients reconnects. Clients A and B see each other's edits, and see + // that client C has disconnected. + channel_buffer_a.read_with(cx_a, |buffer, cx| { + assert_eq!(buffer.buffer().read(cx).text(), "012"); + }); + channel_buffer_b.read_with(cx_b, |buffer, cx| { + assert_eq!(buffer.buffer().read(cx).text(), "012"); + }); + + channel_buffer_a.read_with(cx_a, |buffer_a, _| { + channel_buffer_b.read_with(cx_b, |buffer_b, _| { + assert_eq!( + buffer_a + .collaborators() + .iter() + .map(|c| c.user_id) + .collect::>(), + vec![client_a.user_id().unwrap(), client_b.user_id().unwrap()] + ); + assert_eq!(buffer_a.collaborators(), buffer_b.collaborators()); + }); + }); +} + #[track_caller] fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option]) { assert_eq!(