Add basic randomized integration test for channel notes

Max Brunsfeld created

Change summary

crates/channel/src/channel_store.rs                           |  11 
crates/collab/src/db/queries/channels.rs                      |  14 
crates/collab/src/tests/random_channel_buffer_tests.rs        | 237 ++++
crates/collab/src/tests/random_project_collaboration_tests.rs |  26 
crates/collab/src/tests/randomized_test_helpers.rs            |  37 
crates/collab/src/tests/test_server.rs                        |   9 
6 files changed, 295 insertions(+), 39 deletions(-)

Detailed changes

crates/channel/src/channel_store.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{anyhow, Result};
 use client::{Client, Subscription, User, UserId, UserStore};
 use collections::{hash_map, HashMap, HashSet};
 use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
-use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
+use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 use rpc::{proto, TypedEnvelope};
 use std::{mem, sync::Arc, time::Duration};
 use util::ResultExt;
@@ -152,6 +152,15 @@ impl ChannelStore {
         self.channels_by_id.get(&channel_id)
     }
 
+    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
+        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
+            if let OpenedChannelBuffer::Open(buffer) = buffer {
+                return buffer.upgrade(cx).is_some();
+            }
+        }
+        false
+    }
+
     pub fn open_channel_buffer(
         &mut self,
         channel_id: ChannelId,

crates/collab/src/db/queries/channels.rs 🔗

@@ -1,6 +1,20 @@
 use super::*;
 
 impl Database {
+    #[cfg(test)]
+    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
+        self.transaction(move |tx| async move {
+            let mut channels = Vec::new();
+            let mut rows = channel::Entity::find().stream(&*tx).await?;
+            while let Some(row) = rows.next().await {
+                let row = row?;
+                channels.push((row.id, row.name));
+            }
+            Ok(channels)
+        })
+        .await
+    }
+
     pub async fn create_root_channel(
         &self,
         name: &str,

crates/collab/src/tests/random_channel_buffer_tests.rs 🔗

@@ -1,12 +1,16 @@
-use crate::tests::{run_randomized_test, RandomizedTest, TestClient, TestError, UserTestPlan};
+use super::{run_randomized_test, RandomizedTest, TestClient, TestError, TestServer, UserTestPlan};
 use anyhow::Result;
 use async_trait::async_trait;
 use gpui::{executor::Deterministic, TestAppContext};
-use rand::rngs::StdRng;
+use rand::prelude::*;
 use serde_derive::{Deserialize, Serialize};
-use std::{rc::Rc, sync::Arc};
+use std::{ops::Range, rc::Rc, sync::Arc};
+use text::Bias;
 
-#[gpui::test]
+#[gpui::test(
+    iterations = 100,
+    on_failure = "crate::tests::save_randomized_test_plan"
+)]
 async fn test_random_channel_buffers(
     cx: &mut TestAppContext,
     deterministic: Arc<Deterministic>,
@@ -19,20 +23,105 @@ struct RandomChannelBufferTest;
 
 #[derive(Clone, Serialize, Deserialize)]
 enum ChannelBufferOperation {
-    Join,
+    JoinChannelNotes {
+        channel_name: String,
+    },
+    LeaveChannelNotes {
+        channel_name: String,
+    },
+    EditChannelNotes {
+        channel_name: String,
+        edits: Vec<(Range<usize>, Arc<str>)>,
+    },
+    Noop,
 }
 
+const CHANNEL_COUNT: usize = 3;
+
 #[async_trait(?Send)]
 impl RandomizedTest for RandomChannelBufferTest {
     type Operation = ChannelBufferOperation;
 
+    async fn initialize(server: &mut TestServer, users: &[UserTestPlan]) {
+        let db = &server.app_state.db;
+        for ix in 0..CHANNEL_COUNT {
+            let id = db
+                .create_channel(
+                    &format!("channel-{ix}"),
+                    None,
+                    &format!("livekit-room-{ix}"),
+                    users[0].user_id,
+                )
+                .await
+                .unwrap();
+            for user in &users[1..] {
+                db.invite_channel_member(id, user.user_id, users[0].user_id, false)
+                    .await
+                    .unwrap();
+                db.respond_to_channel_invite(id, user.user_id, true)
+                    .await
+                    .unwrap();
+            }
+        }
+    }
+
     fn generate_operation(
         client: &TestClient,
         rng: &mut StdRng,
-        plan: &mut UserTestPlan,
+        _: &mut UserTestPlan,
         cx: &TestAppContext,
     ) -> ChannelBufferOperation {
-        ChannelBufferOperation::Join
+        let channel_store = client.channel_store().clone();
+        let channel_buffers = client.channel_buffers();
+
+        // When signed out, we can't do anything unless a channel buffer is
+        // already open.
+        if channel_buffers.is_empty()
+            && channel_store.read_with(cx, |store, _| store.channel_count() == 0)
+        {
+            return ChannelBufferOperation::Noop;
+        }
+
+        loop {
+            match rng.gen_range(0..100_u32) {
+                0..=29 => {
+                    let channel_name = client.channel_store().read_with(cx, |store, cx| {
+                        store.channels().find_map(|(_, channel)| {
+                            if store.has_open_channel_buffer(channel.id, cx) {
+                                None
+                            } else {
+                                Some(channel.name.clone())
+                            }
+                        })
+                    });
+                    if let Some(channel_name) = channel_name {
+                        break ChannelBufferOperation::JoinChannelNotes { channel_name };
+                    }
+                }
+
+                30..=40 => {
+                    if let Some(buffer) = channel_buffers.iter().choose(rng) {
+                        let channel_name = buffer.read_with(cx, |b, _| b.channel().name.clone());
+                        break ChannelBufferOperation::LeaveChannelNotes { channel_name };
+                    }
+                }
+
+                _ => {
+                    if let Some(buffer) = channel_buffers.iter().choose(rng) {
+                        break buffer.read_with(cx, |b, _| {
+                            let channel_name = b.channel().name.clone();
+                            let edits = b
+                                .buffer()
+                                .read_with(cx, |buffer, _| buffer.get_random_edits(rng, 3));
+                            ChannelBufferOperation::EditChannelNotes {
+                                channel_name,
+                                edits,
+                            }
+                        });
+                    }
+                }
+            }
+        }
     }
 
     async fn apply_operation(
@@ -40,10 +129,140 @@ impl RandomizedTest for RandomChannelBufferTest {
         operation: ChannelBufferOperation,
         cx: &mut TestAppContext,
     ) -> Result<(), TestError> {
+        match operation {
+            ChannelBufferOperation::JoinChannelNotes { channel_name } => {
+                let buffer = client.channel_store().update(cx, |store, cx| {
+                    let channel_id = store
+                        .channels()
+                        .find(|(_, c)| c.name == channel_name)
+                        .unwrap()
+                        .1
+                        .id;
+                    if store.has_open_channel_buffer(channel_id, cx) {
+                        Err(TestError::Inapplicable)
+                    } else {
+                        Ok(store.open_channel_buffer(channel_id, cx))
+                    }
+                })?;
+
+                log::info!(
+                    "{}: opening notes for channel {channel_name}",
+                    client.username
+                );
+                client.channel_buffers().insert(buffer.await?);
+            }
+
+            ChannelBufferOperation::LeaveChannelNotes { channel_name } => {
+                let buffer = cx.update(|cx| {
+                    let mut left_buffer = Err(TestError::Inapplicable);
+                    client.channel_buffers().retain(|buffer| {
+                        if buffer.read(cx).channel().name == channel_name {
+                            left_buffer = Ok(buffer.clone());
+                            false
+                        } else {
+                            true
+                        }
+                    });
+                    left_buffer
+                })?;
+
+                log::info!(
+                    "{}: closing notes for channel {channel_name}",
+                    client.username
+                );
+                cx.update(|_| drop(buffer));
+            }
+
+            ChannelBufferOperation::EditChannelNotes {
+                channel_name,
+                edits,
+            } => {
+                let channel_buffer = cx
+                    .read(|cx| {
+                        client
+                            .channel_buffers()
+                            .iter()
+                            .find(|buffer| buffer.read(cx).channel().name == channel_name)
+                            .cloned()
+                    })
+                    .ok_or_else(|| TestError::Inapplicable)?;
+
+                log::info!(
+                    "{}: editing notes for channel {channel_name} with {:?}",
+                    client.username,
+                    edits
+                );
+
+                channel_buffer.update(cx, |buffer, cx| {
+                    let buffer = buffer.buffer();
+                    buffer.update(cx, |buffer, cx| {
+                        let snapshot = buffer.snapshot();
+                        buffer.edit(
+                            edits.into_iter().map(|(range, text)| {
+                                let start = snapshot.clip_offset(range.start, Bias::Left);
+                                let end = snapshot.clip_offset(range.end, Bias::Right);
+                                (start..end, text)
+                            }),
+                            None,
+                            cx,
+                        );
+                    });
+                });
+            }
+
+            ChannelBufferOperation::Noop => Err(TestError::Inapplicable)?,
+        }
         Ok(())
     }
 
-    async fn on_client_added(client: &Rc<TestClient>) {}
+    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext) {
+        let channel_store = client.channel_store();
+        while channel_store.read_with(cx, |store, _| store.channel_count() == 0) {
+            channel_store.next_notification(cx).await;
+        }
+    }
 
-    fn on_clients_quiesced(clients: &[(Rc<TestClient>, TestAppContext)]) {}
+    async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
+        let channels = server.app_state.db.all_channels().await.unwrap();
+
+        for (channel_id, channel_name) in channels {
+            let mut collaborator_user_ids = server
+                .app_state
+                .db
+                .get_channel_buffer_collaborators(channel_id)
+                .await
+                .unwrap()
+                .into_iter()
+                .map(|id| id.to_proto())
+                .collect::<Vec<_>>();
+            collaborator_user_ids.sort();
+
+            for (client, client_cx) in clients.iter_mut() {
+                client_cx.update(|cx| {
+                    client
+                        .channel_buffers()
+                        .retain(|b| b.read(cx).is_connected());
+
+                    if let Some(channel_buffer) = client
+                        .channel_buffers()
+                        .iter()
+                        .find(|b| b.read(cx).channel().id == channel_id.to_proto())
+                    {
+                        let channel_buffer = channel_buffer.read(cx);
+                        let collaborators = channel_buffer.collaborators();
+                        let mut user_ids =
+                            collaborators.iter().map(|c| c.user_id).collect::<Vec<_>>();
+                        user_ids.sort();
+                        assert_eq!(
+                            user_ids,
+                            collaborator_user_ids,
+                            "client {} has different user ids for channel {} than the server",
+                            client.user_id().unwrap(),
+                            channel_name
+                        );
+                    }
+                });
+            }
+        }
+    }
 }

crates/collab/src/tests/random_project_collaboration_tests.rs 🔗

@@ -1,7 +1,5 @@
-use crate::{
-    db::UserId,
-    tests::{run_randomized_test, RandomizedTest, TestClient, TestError, UserTestPlan},
-};
+use super::{run_randomized_test, RandomizedTest, TestClient, TestError, TestServer, UserTestPlan};
+use crate::db::UserId;
 use anyhow::{anyhow, Result};
 use async_trait::async_trait;
 use call::ActiveCall;
@@ -145,6 +143,20 @@ struct ProjectCollaborationTest;
 impl RandomizedTest for ProjectCollaborationTest {
     type Operation = ClientOperation;
 
+    async fn initialize(server: &mut TestServer, users: &[UserTestPlan]) {
+        let db = &server.app_state.db;
+        for (ix, user_a) in users.iter().enumerate() {
+            for user_b in &users[ix + 1..] {
+                db.send_contact_request(user_a.user_id, user_b.user_id)
+                    .await
+                    .unwrap();
+                db.respond_to_contact_request(user_b.user_id, user_a.user_id, true)
+                    .await
+                    .unwrap();
+            }
+        }
+    }
+
     fn generate_operation(
         client: &TestClient,
         rng: &mut StdRng,
@@ -1005,7 +1017,7 @@ impl RandomizedTest for ProjectCollaborationTest {
         Ok(())
     }
 
-    async fn on_client_added(client: &Rc<TestClient>) {
+    async fn on_client_added(client: &Rc<TestClient>, _: &mut TestAppContext) {
         let mut language = Language::new(
             LanguageConfig {
                 name: "Rust".into(),
@@ -1119,8 +1131,8 @@ impl RandomizedTest for ProjectCollaborationTest {
         client.app_state.languages.add(Arc::new(language));
     }
 
-    fn on_clients_quiesced(clients: &[(Rc<TestClient>, TestAppContext)]) {
-        for (client, client_cx) in clients {
+    async fn on_quiesce(_: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
+        for (client, client_cx) in clients.iter() {
             for guest_project in client.remote_projects().iter() {
                 guest_project.read_with(client_cx, |guest_project, cx| {
                         let host_project = clients.iter().find_map(|(client, cx)| {

crates/collab/src/tests/randomized_test_helpers.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{
-    db::{self, Database, NewUserParams, UserId},
+    db::{self, NewUserParams, UserId},
     rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
     tests::{TestClient, TestServer},
 };
@@ -107,15 +107,17 @@ pub trait RandomizedTest: 'static + Sized {
         cx: &TestAppContext,
     ) -> Self::Operation;
 
-    async fn on_client_added(client: &Rc<TestClient>);
-
-    fn on_clients_quiesced(client: &[(Rc<TestClient>, TestAppContext)]);
-
     async fn apply_operation(
         client: &TestClient,
         operation: Self::Operation,
         cx: &mut TestAppContext,
     ) -> Result<(), TestError>;
+
+    async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
+
+    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext);
+
+    async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
 }
 
 pub async fn run_randomized_test<T: RandomizedTest>(
@@ -125,7 +127,7 @@ pub async fn run_randomized_test<T: RandomizedTest>(
 ) {
     deterministic.forbid_parking();
     let mut server = TestServer::start(&deterministic).await;
-    let plan = TestPlan::<T>::new(server.app_state.db.clone(), rng).await;
+    let plan = TestPlan::<T>::new(&mut server, rng).await;
 
     LAST_PLAN.lock().replace({
         let plan = plan.clone();
@@ -162,7 +164,7 @@ pub async fn run_randomized_test<T: RandomizedTest>(
     deterministic.finish_waiting();
 
     deterministic.run_until_parked();
-    T::on_clients_quiesced(&clients);
+    T::on_quiesce(&mut server, &mut clients).await;
 
     for (client, mut cx) in clients {
         cx.update(|cx| {
@@ -190,7 +192,7 @@ pub fn save_randomized_test_plan() {
 }
 
 impl<T: RandomizedTest> TestPlan<T> {
-    pub async fn new(db: Arc<Database>, mut rng: StdRng) -> Arc<Mutex<Self>> {
+    pub async fn new(server: &mut TestServer, mut rng: StdRng) -> Arc<Mutex<Self>> {
         let allow_server_restarts = rng.gen_bool(0.7);
         let allow_client_reconnection = rng.gen_bool(0.7);
         let allow_client_disconnection = rng.gen_bool(0.1);
@@ -198,7 +200,9 @@ impl<T: RandomizedTest> TestPlan<T> {
         let mut users = Vec::new();
         for ix in 0..*MAX_PEERS {
             let username = format!("user-{}", ix + 1);
-            let user_id = db
+            let user_id = server
+                .app_state
+                .db
                 .create_user(
                     &format!("{username}@example.com"),
                     false,
@@ -222,16 +226,7 @@ impl<T: RandomizedTest> TestPlan<T> {
             });
         }
 
-        for (ix, user_a) in users.iter().enumerate() {
-            for user_b in &users[ix + 1..] {
-                db.send_contact_request(user_a.user_id, user_b.user_id)
-                    .await
-                    .unwrap();
-                db.respond_to_contact_request(user_b.user_id, user_a.user_id, true)
-                    .await
-                    .unwrap();
-            }
-        }
+        T::initialize(server, &users).await;
 
         let plan = Arc::new(Mutex::new(Self {
             replay: false,
@@ -619,7 +614,7 @@ impl<T: RandomizedTest> TestPlan<T> {
 
                 if quiesce && applied {
                     deterministic.run_until_parked();
-                    T::on_clients_quiesced(&clients);
+                    T::on_quiesce(server, clients).await;
                 }
 
                 return applied;
@@ -634,7 +629,7 @@ impl<T: RandomizedTest> TestPlan<T> {
         mut operation_rx: futures::channel::mpsc::UnboundedReceiver<usize>,
         mut cx: TestAppContext,
     ) {
-        T::on_client_added(&client).await;
+        T::on_client_added(&client, &mut cx).await;
 
         while let Some(batch_id) = operation_rx.next().await {
             let Some((operation, applied)) =

crates/collab/src/tests/test_server.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
 };
 use anyhow::anyhow;
 use call::ActiveCall;
-use channel::ChannelStore;
+use channel::{channel_buffer::ChannelBuffer, ChannelStore};
 use client::{
     self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
 };
@@ -51,6 +51,7 @@ struct TestClientState {
     local_projects: Vec<ModelHandle<Project>>,
     remote_projects: Vec<ModelHandle<Project>>,
     buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
+    channel_buffers: HashSet<ModelHandle<ChannelBuffer>>,
 }
 
 pub struct ContactsSummary {
@@ -468,6 +469,12 @@ impl TestClient {
         RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers)
     }
 
+    pub fn channel_buffers<'a>(
+        &'a self,
+    ) -> impl DerefMut<Target = HashSet<ModelHandle<ChannelBuffer>>> + 'a {
+        RefMut::map(self.state.borrow_mut(), |state| &mut state.channel_buffers)
+    }
+
     pub fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
         self.app_state
             .user_store