Add an RPC handler for channel buffer acks

Mikayla and max created

co-authored-by: max <max@zed.dev>

Change summary

crates/channel/src/channel.rs                   |  2 
crates/channel/src/channel_buffer.rs            |  2 
crates/collab/src/rpc.rs                        | 25 ++++++++++++++++--
crates/collab/src/tests/channel_buffer_tests.rs | 21 +++++++++++----
crates/collab/src/tests/test_server.rs          | 16 +++++++++++
crates/collab_ui/src/collab_panel.rs            |  2 -
6 files changed, 54 insertions(+), 14 deletions(-)

Detailed changes

crates/channel/src/channel.rs 🔗

@@ -2,7 +2,7 @@ mod channel_buffer;
 mod channel_chat;
 mod channel_store;
 
-pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent};
+pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
 pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId};
 pub use channel_store::{
     Channel, ChannelData, ChannelEvent, ChannelId, ChannelMembership, ChannelPath, ChannelStore,

crates/channel/src/channel_buffer.rs 🔗

@@ -11,7 +11,7 @@ use rpc::{
 use std::{sync::Arc, time::Duration};
 use util::ResultExt;
 
-const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
+pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 
 pub(crate) fn init(client: &Arc<Client>) {
     client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);

crates/collab/src/rpc.rs 🔗

@@ -3,8 +3,8 @@ mod connection_pool;
 use crate::{
     auth,
     db::{
-        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
-        UserId,
+        self, BufferId, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId,
+        ServerId, User, UserId,
     },
     executor::Executor,
     AppState, Result,
@@ -275,7 +275,8 @@ impl Server {
             .add_message_handler(update_followers)
             .add_message_handler(update_diff_base)
             .add_request_handler(get_private_user_info)
-            .add_message_handler(acknowledge_channel_message);
+            .add_message_handler(acknowledge_channel_message)
+            .add_message_handler(acknowledge_buffer_version);
 
         Arc::new(server)
     }
@@ -2912,6 +2913,24 @@ async fn acknowledge_channel_message(
     Ok(())
 }
 
+async fn acknowledge_buffer_version(
+    request: proto::AckBufferOperation,
+    session: Session,
+) -> Result<()> {
+    let buffer_id = BufferId::from_proto(request.buffer_id);
+    session
+        .db()
+        .await
+        .observe_buffer_version(
+            buffer_id,
+            session.user_id,
+            request.epoch as i32,
+            &request.version,
+        )
+        .await?;
+    Ok(())
+}
+
 async fn join_channel_chat(
     request: proto::JoinChannelChat,
     response: Response<proto::JoinChannelChat>,

crates/collab/src/tests/channel_buffer_tests.rs 🔗

@@ -3,7 +3,7 @@ use crate::{
     tests::TestServer,
 };
 use call::ActiveCall;
-use channel::Channel;
+use channel::{Channel, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
 use client::ParticipantIndex;
 use client::{Collaborator, UserId};
 use collab_ui::channel_view::ChannelView;
@@ -800,7 +800,6 @@ async fn test_channel_buffer_changes(
             .has_channel_buffer_changed(channel_id)
             .unwrap()
     });
-
     assert!(has_buffer_changed);
 
     // Opening the buffer should clear the changed flag.
@@ -810,7 +809,6 @@ async fn test_channel_buffer_changes(
         .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx))
         .await
         .unwrap();
-
     deterministic.run_until_parked();
 
     let has_buffer_changed = cx_b.read(|cx| {
@@ -820,10 +818,9 @@ async fn test_channel_buffer_changes(
             .has_channel_buffer_changed(channel_id)
             .unwrap()
     });
-
     assert!(!has_buffer_changed);
 
-    // Editing the channel while the buffer is open shuold not show that the buffer has changed.
+    // Editing the channel while the buffer is open should not show that the buffer has changed.
     channel_buffer_a.update(cx_a, |buffer, cx| {
         buffer.buffer().update(cx, |buffer, cx| {
             buffer.edit([(0..0, "2")], None, cx);
@@ -838,7 +835,20 @@ async fn test_channel_buffer_changes(
             .has_channel_buffer_changed(channel_id)
             .unwrap()
     });
+    assert!(!has_buffer_changed);
 
+    deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL);
+
+    // Test that the server is tracking things correctly, and we retain our 'not changed'
+    // state across a disconnect
+    server.simulate_long_connection_interruption(client_b.peer_id().unwrap(), &deterministic);
+    let has_buffer_changed = cx_b.read(|cx| {
+        client_b
+            .channel_store()
+            .read(cx)
+            .has_channel_buffer_changed(channel_id)
+            .unwrap()
+    });
     assert!(!has_buffer_changed);
 
     // Closing the buffer should re-enable change tracking
@@ -866,7 +876,6 @@ async fn test_channel_buffer_changes(
             .has_channel_buffer_changed(channel_id)
             .unwrap()
     });
-
     assert!(has_buffer_changed);
 }
 

crates/collab/src/tests/test_server.rs 🔗

@@ -1,7 +1,7 @@
 use crate::{
     db::{tests::TestDb, NewUserParams, UserId},
     executor::Executor,
-    rpc::{Server, CLEANUP_TIMEOUT},
+    rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
     AppState,
 };
 use anyhow::anyhow;
@@ -17,6 +17,7 @@ use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext, WindowHan
 use language::LanguageRegistry;
 use parking_lot::Mutex;
 use project::{Project, WorktreeId};
+use rpc::RECEIVE_TIMEOUT;
 use settings::SettingsStore;
 use std::{
     cell::{Ref, RefCell, RefMut},
@@ -255,6 +256,19 @@ impl TestServer {
             .store(true, SeqCst);
     }
 
+    pub fn simulate_long_connection_interruption(
+        &self,
+        peer_id: PeerId,
+        deterministic: &Arc<Deterministic>,
+    ) {
+        self.forbid_connections();
+        self.disconnect_client(peer_id);
+        deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+        self.allow_connections();
+        deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+        deterministic.run_until_parked();
+    }
+
     pub fn forbid_connections(&self) {
         self.forbid_connections.store(true, SeqCst);
     }

crates/collab_ui/src/collab_panel.rs 🔗

@@ -2760,11 +2760,9 @@ impl CollabPanel {
                                 .read(cx)
                                 .channel_id()?;
 
-                            dbg!(call_channel, channel.id);
                             Some(call_channel == channel.id)
                         })
                         .unwrap_or(false);
-                        dbg!(is_active);
                         if is_active {
                             self.open_channel_notes(
                                 &OpenChannelNotes {