Move randomized integration test into its own file

Max Brunsfeld created

Change summary

crates/collab/src/lib.rs                                |    5 
crates/collab/src/tests.rs                              |  466 +++
crates/collab/src/tests/integration_tests.rs            | 1360 ----------
crates/collab/src/tests/randomized_integration_tests.rs |  919 +++++++
4 files changed, 1,402 insertions(+), 1,348 deletions(-)

Detailed changes

crates/collab/src/lib.rs 🔗

@@ -3,10 +3,11 @@ pub mod auth;
 pub mod db;
 pub mod env;
 pub mod executor;
-#[cfg(test)]
-mod integration_tests;
 pub mod rpc;
 
+#[cfg(test)]
+mod tests;
+
 use axum::{http::StatusCode, response::IntoResponse};
 use db::Database;
 use serde::Deserialize;

crates/collab/src/tests.rs 🔗

@@ -0,0 +1,466 @@
+use crate::{
+    db::{NewUserParams, TestDb, UserId},
+    executor::Executor,
+    rpc::{Server, CLEANUP_TIMEOUT},
+    AppState,
+};
+use anyhow::anyhow;
+use call::ActiveCall;
+use client::{
+    self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
+    EstablishConnectionError, UserStore,
+};
+use collections::{HashMap, HashSet};
+use fs::{FakeFs, HomeDir};
+use futures::{channel::oneshot, StreamExt as _};
+use gpui::{
+    executor::Deterministic, test::EmptyView, ModelHandle, Task, TestAppContext, ViewHandle,
+};
+use language::LanguageRegistry;
+use parking_lot::Mutex;
+use project::{Project, WorktreeId};
+use settings::Settings;
+use std::{
+    env,
+    ops::Deref,
+    path::{Path, PathBuf},
+    sync::{
+        atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
+        Arc,
+    },
+};
+use theme::ThemeRegistry;
+use workspace::Workspace;
+
+mod integration_tests;
+mod randomized_integration_tests;
+
+struct TestServer {
+    app_state: Arc<AppState>,
+    server: Arc<Server>,
+    connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
+    forbid_connections: Arc<AtomicBool>,
+    _test_db: TestDb,
+    test_live_kit_server: Arc<live_kit_client::TestServer>,
+}
+
+impl TestServer {
+    async fn start(deterministic: &Arc<Deterministic>) -> Self {
+        static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
+
+        let use_postgres = env::var("USE_POSTGRES").ok();
+        let use_postgres = use_postgres.as_deref();
+        let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
+            TestDb::postgres(deterministic.build_background())
+        } else {
+            TestDb::sqlite(deterministic.build_background())
+        };
+        let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
+        let live_kit_server = live_kit_client::TestServer::create(
+            format!("http://livekit.{}.test", live_kit_server_id),
+            format!("devkey-{}", live_kit_server_id),
+            format!("secret-{}", live_kit_server_id),
+            deterministic.build_background(),
+        )
+        .unwrap();
+        let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
+        let epoch = app_state
+            .db
+            .create_server(&app_state.config.zed_environment)
+            .await
+            .unwrap();
+        let server = Server::new(
+            epoch,
+            app_state.clone(),
+            Executor::Deterministic(deterministic.build_background()),
+        );
+        server.start().await.unwrap();
+        // Advance clock to ensure the server's cleanup task is finished.
+        deterministic.advance_clock(CLEANUP_TIMEOUT);
+        Self {
+            app_state,
+            server,
+            connection_killers: Default::default(),
+            forbid_connections: Default::default(),
+            _test_db: test_db,
+            test_live_kit_server: live_kit_server,
+        }
+    }
+
+    async fn reset(&self) {
+        self.app_state.db.reset();
+        let epoch = self
+            .app_state
+            .db
+            .create_server(&self.app_state.config.zed_environment)
+            .await
+            .unwrap();
+        self.server.reset(epoch);
+    }
+
+    async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
+        cx.update(|cx| {
+            cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()));
+
+            let mut settings = Settings::test(cx);
+            settings.projects_online_by_default = false;
+            cx.set_global(settings);
+        });
+
+        let http = FakeHttpClient::with_404_response();
+        let user_id = if let Ok(Some(user)) = self
+            .app_state
+            .db
+            .get_user_by_github_account(name, None)
+            .await
+        {
+            user.id
+        } else {
+            self.app_state
+                .db
+                .create_user(
+                    &format!("{name}@example.com"),
+                    false,
+                    NewUserParams {
+                        github_login: name.into(),
+                        github_user_id: 0,
+                        invite_count: 0,
+                    },
+                )
+                .await
+                .expect("creating user failed")
+                .user_id
+        };
+        let client_name = name.to_string();
+        let mut client = cx.read(|cx| Client::new(http.clone(), cx));
+        let server = self.server.clone();
+        let db = self.app_state.db.clone();
+        let connection_killers = self.connection_killers.clone();
+        let forbid_connections = self.forbid_connections.clone();
+
+        Arc::get_mut(&mut client)
+            .unwrap()
+            .set_id(user_id.0 as usize)
+            .override_authenticate(move |cx| {
+                cx.spawn(|_| async move {
+                    let access_token = "the-token".to_string();
+                    Ok(Credentials {
+                        user_id: user_id.0 as u64,
+                        access_token,
+                    })
+                })
+            })
+            .override_establish_connection(move |credentials, cx| {
+                assert_eq!(credentials.user_id, user_id.0 as u64);
+                assert_eq!(credentials.access_token, "the-token");
+
+                let server = server.clone();
+                let db = db.clone();
+                let connection_killers = connection_killers.clone();
+                let forbid_connections = forbid_connections.clone();
+                let client_name = client_name.clone();
+                cx.spawn(move |cx| async move {
+                    if forbid_connections.load(SeqCst) {
+                        Err(EstablishConnectionError::other(anyhow!(
+                            "server is forbidding connections"
+                        )))
+                    } else {
+                        let (client_conn, server_conn, killed) =
+                            Connection::in_memory(cx.background());
+                        let (connection_id_tx, connection_id_rx) = oneshot::channel();
+                        let user = db
+                            .get_user_by_id(user_id)
+                            .await
+                            .expect("retrieving user failed")
+                            .unwrap();
+                        cx.background()
+                            .spawn(server.handle_connection(
+                                server_conn,
+                                client_name,
+                                user,
+                                Some(connection_id_tx),
+                                Executor::Deterministic(cx.background()),
+                            ))
+                            .detach();
+                        let connection_id = connection_id_rx.await.unwrap();
+                        connection_killers
+                            .lock()
+                            .insert(connection_id.into(), killed);
+                        Ok(client_conn)
+                    }
+                })
+            });
+
+        let fs = FakeFs::new(cx.background());
+        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
+        let app_state = Arc::new(workspace::AppState {
+            client: client.clone(),
+            user_store: user_store.clone(),
+            languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
+            themes: ThemeRegistry::new((), cx.font_cache()),
+            fs: fs.clone(),
+            build_window_options: Default::default,
+            initialize_workspace: |_, _, _| unimplemented!(),
+            dock_default_item_factory: |_, _| unimplemented!(),
+        });
+
+        Project::init(&client);
+        cx.update(|cx| {
+            workspace::init(app_state.clone(), cx);
+            call::init(client.clone(), user_store.clone(), cx);
+        });
+
+        client
+            .authenticate_and_connect(false, &cx.to_async())
+            .await
+            .unwrap();
+
+        let client = TestClient {
+            client,
+            username: name.to_string(),
+            local_projects: Default::default(),
+            remote_projects: Default::default(),
+            next_root_dir_id: 0,
+            user_store,
+            fs,
+            language_registry: Arc::new(LanguageRegistry::test()),
+            buffers: Default::default(),
+        };
+        client.wait_for_current_user(cx).await;
+        client
+    }
+
+    fn disconnect_client(&self, peer_id: PeerId) {
+        self.connection_killers
+            .lock()
+            .remove(&peer_id)
+            .unwrap()
+            .store(true, SeqCst);
+    }
+
+    fn forbid_connections(&self) {
+        self.forbid_connections.store(true, SeqCst);
+    }
+
+    fn allow_connections(&self) {
+        self.forbid_connections.store(false, SeqCst);
+    }
+
+    async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
+        for ix in 1..clients.len() {
+            let (left, right) = clients.split_at_mut(ix);
+            let (client_a, cx_a) = left.last_mut().unwrap();
+            for (client_b, cx_b) in right {
+                client_a
+                    .user_store
+                    .update(*cx_a, |store, cx| {
+                        store.request_contact(client_b.user_id().unwrap(), cx)
+                    })
+                    .await
+                    .unwrap();
+                cx_a.foreground().run_until_parked();
+                client_b
+                    .user_store
+                    .update(*cx_b, |store, cx| {
+                        store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
+                    })
+                    .await
+                    .unwrap();
+            }
+        }
+    }
+
+    async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
+        self.make_contacts(clients).await;
+
+        let (left, right) = clients.split_at_mut(1);
+        let (_client_a, cx_a) = &mut left[0];
+        let active_call_a = cx_a.read(ActiveCall::global);
+
+        for (client_b, cx_b) in right {
+            let user_id_b = client_b.current_user_id(*cx_b).to_proto();
+            active_call_a
+                .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
+                .await
+                .unwrap();
+
+            cx_b.foreground().run_until_parked();
+            let active_call_b = cx_b.read(ActiveCall::global);
+            active_call_b
+                .update(*cx_b, |call, cx| call.accept_incoming(cx))
+                .await
+                .unwrap();
+        }
+    }
+
+    async fn build_app_state(
+        test_db: &TestDb,
+        fake_server: &live_kit_client::TestServer,
+    ) -> Arc<AppState> {
+        Arc::new(AppState {
+            db: test_db.db().clone(),
+            live_kit_client: Some(Arc::new(fake_server.create_api_client())),
+            config: Default::default(),
+        })
+    }
+}
+
+impl Deref for TestServer {
+    type Target = Server;
+
+    fn deref(&self) -> &Self::Target {
+        &self.server
+    }
+}
+
+impl Drop for TestServer {
+    fn drop(&mut self) {
+        self.server.teardown();
+        self.test_live_kit_server.teardown().unwrap();
+    }
+}
+
+struct TestClient {
+    client: Arc<Client>,
+    username: String,
+    local_projects: Vec<ModelHandle<Project>>,
+    remote_projects: Vec<ModelHandle<Project>>,
+    next_root_dir_id: usize,
+    pub user_store: ModelHandle<UserStore>,
+    language_registry: Arc<LanguageRegistry>,
+    fs: Arc<FakeFs>,
+    buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
+}
+
+impl Deref for TestClient {
+    type Target = Arc<Client>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.client
+    }
+}
+
+struct ContactsSummary {
+    pub current: Vec<String>,
+    pub outgoing_requests: Vec<String>,
+    pub incoming_requests: Vec<String>,
+}
+
+impl TestClient {
+    pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
+        UserId::from_proto(
+            self.user_store
+                .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
+        )
+    }
+
+    async fn wait_for_current_user(&self, cx: &TestAppContext) {
+        let mut authed_user = self
+            .user_store
+            .read_with(cx, |user_store, _| user_store.watch_current_user());
+        while authed_user.next().await.unwrap().is_none() {}
+    }
+
+    async fn clear_contacts(&self, cx: &mut TestAppContext) {
+        self.user_store
+            .update(cx, |store, _| store.clear_contacts())
+            .await;
+    }
+
+    fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
+        self.user_store.read_with(cx, |store, _| ContactsSummary {
+            current: store
+                .contacts()
+                .iter()
+                .map(|contact| contact.user.github_login.clone())
+                .collect(),
+            outgoing_requests: store
+                .outgoing_contact_requests()
+                .iter()
+                .map(|user| user.github_login.clone())
+                .collect(),
+            incoming_requests: store
+                .incoming_contact_requests()
+                .iter()
+                .map(|user| user.github_login.clone())
+                .collect(),
+        })
+    }
+
+    async fn build_local_project(
+        &self,
+        root_path: impl AsRef<Path>,
+        cx: &mut TestAppContext,
+    ) -> (ModelHandle<Project>, WorktreeId) {
+        let project = cx.update(|cx| {
+            Project::local(
+                self.client.clone(),
+                self.user_store.clone(),
+                self.language_registry.clone(),
+                self.fs.clone(),
+                cx,
+            )
+        });
+        let (worktree, _) = project
+            .update(cx, |p, cx| {
+                p.find_or_create_local_worktree(root_path, true, cx)
+            })
+            .await
+            .unwrap();
+        worktree
+            .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
+            .await;
+        (project, worktree.read_with(cx, |tree, _| tree.id()))
+    }
+
+    async fn build_remote_project(
+        &self,
+        host_project_id: u64,
+        guest_cx: &mut TestAppContext,
+    ) -> ModelHandle<Project> {
+        let active_call = guest_cx.read(ActiveCall::global);
+        let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
+        room.update(guest_cx, |room, cx| {
+            room.join_project(
+                host_project_id,
+                self.language_registry.clone(),
+                self.fs.clone(),
+                cx,
+            )
+        })
+        .await
+        .unwrap()
+    }
+
+    fn build_workspace(
+        &self,
+        project: &ModelHandle<Project>,
+        cx: &mut TestAppContext,
+    ) -> ViewHandle<Workspace> {
+        let (_, root_view) = cx.add_window(|_| EmptyView);
+        cx.add_view(&root_view, |cx| {
+            Workspace::new(
+                Default::default(),
+                0,
+                project.clone(),
+                |_, _| unimplemented!(),
+                cx,
+            )
+        })
+    }
+
+    fn create_new_root_dir(&mut self) -> PathBuf {
+        format!(
+            "/{}-root-{}",
+            self.username,
+            util::post_inc(&mut self.next_root_dir_id)
+        )
+        .into()
+    }
+}
+
+impl Drop for TestClient {
+    fn drop(&mut self) {
+        self.client.teardown();
+    }
+}

crates/collab/src/integration_tests.rs → crates/collab/src/tests/integration_tests.rs 🔗

@@ -1,51 +1,37 @@
 use crate::{
-    db::{self, NewUserParams, TestDb, UserId},
-    executor::Executor,
-    rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
-    AppState,
+    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
+    tests::{TestClient, TestServer},
 };
-use anyhow::anyhow;
 use call::{room, ActiveCall, ParticipantLocation, Room};
-use client::{
-    self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
-    EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT,
-};
-use collections::{BTreeMap, HashMap, HashSet};
+use client::{User, RECEIVE_TIMEOUT};
+use collections::HashSet;
 use editor::{
-    self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, ExcerptRange, MultiBuffer,
-    Redo, Rename, ToOffset, ToggleCodeActions, Undo,
+    ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, ExcerptRange, MultiBuffer, Redo,
+    Rename, ToOffset, ToggleCodeActions, Undo,
 };
-use fs::{FakeFs, Fs as _, HomeDir, LineEnding, RemoveOptions};
-use futures::{channel::oneshot, StreamExt as _};
+use fs::{FakeFs, Fs as _, LineEnding, RemoveOptions};
+use futures::StreamExt as _;
 use gpui::{
-    executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, Task,
-    TestAppContext, ViewHandle,
+    executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, TestAppContext,
+    ViewHandle,
 };
 use language::{
-    range_to_lsp, tree_sitter_rust, Anchor, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
-    LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, PointUtf16, Rope,
+    tree_sitter_rust, Anchor, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
+    LanguageConfig, OffsetRangeExt, Point, Rope,
 };
 use live_kit_client::MacOSDisplay;
-use lsp::{self, FakeLanguageServer};
-use parking_lot::Mutex;
-use project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath, WorktreeId};
+use project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath};
 use rand::prelude::*;
 use serde_json::json;
 use settings::{Formatter, Settings};
 use std::{
     cell::{Cell, RefCell},
     env, future, mem,
-    ops::Deref,
     path::{Path, PathBuf},
     rc::Rc,
-    sync::{
-        atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
-        Arc,
-    },
+    sync::Arc,
 };
-use theme::ThemeRegistry;
 use unindent::Unindent as _;
-use util::post_inc;
 use workspace::{item::Item, shared_screen::SharedScreen, SplitDirection, ToggleFollow, Workspace};
 
 #[ctor::ctor]
@@ -6384,1324 +6370,6 @@ async fn test_peers_simultaneously_following_each_other(
     });
 }
 
-#[gpui::test(iterations = 100)]
-async fn test_random_collaboration(
-    cx: &mut TestAppContext,
-    deterministic: Arc<Deterministic>,
-    rng: StdRng,
-) {
-    deterministic.forbid_parking();
-    let rng = Arc::new(Mutex::new(rng));
-
-    let max_peers = env::var("MAX_PEERS")
-        .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
-        .unwrap_or(5);
-
-    let max_operations = env::var("OPERATIONS")
-        .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
-        .unwrap_or(10);
-
-    let mut server = TestServer::start(&deterministic).await;
-    let db = server.app_state.db.clone();
-
-    let mut available_guests = Vec::new();
-    for ix in 0..max_peers {
-        let username = format!("guest-{}", ix + 1);
-        let user_id = db
-            .create_user(
-                &format!("{username}@example.com"),
-                false,
-                NewUserParams {
-                    github_login: username.clone(),
-                    github_user_id: (ix + 1) as i32,
-                    invite_count: 0,
-                },
-            )
-            .await
-            .unwrap()
-            .user_id;
-        available_guests.push((user_id, username));
-    }
-
-    for (ix, (user_id_a, _)) in available_guests.iter().enumerate() {
-        for (user_id_b, _) in &available_guests[ix + 1..] {
-            server
-                .app_state
-                .db
-                .send_contact_request(*user_id_a, *user_id_b)
-                .await
-                .unwrap();
-            server
-                .app_state
-                .db
-                .respond_to_contact_request(*user_id_b, *user_id_a, true)
-                .await
-                .unwrap();
-        }
-    }
-
-    let mut clients = Vec::new();
-    let mut user_ids = Vec::new();
-    let mut op_start_signals = Vec::new();
-    let mut next_entity_id = 100000;
-
-    let mut operations = 0;
-    while operations < max_operations {
-        let distribution = rng.lock().gen_range(0..100);
-        match distribution {
-            0..=19 if !available_guests.is_empty() => {
-                let guest_ix = rng.lock().gen_range(0..available_guests.len());
-                let (_, guest_username) = available_guests.remove(guest_ix);
-                log::info!("Adding new connection for {}", guest_username);
-                next_entity_id += 100000;
-                let mut guest_cx = TestAppContext::new(
-                    cx.foreground_platform(),
-                    cx.platform(),
-                    deterministic.build_foreground(next_entity_id),
-                    deterministic.build_background(),
-                    cx.font_cache(),
-                    cx.leak_detector(),
-                    next_entity_id,
-                    cx.function_name.clone(),
-                );
-
-                let op_start_signal = futures::channel::mpsc::unbounded();
-                let guest = server.create_client(&mut guest_cx, &guest_username).await;
-                user_ids.push(guest.current_user_id(&guest_cx));
-                op_start_signals.push(op_start_signal.0);
-                clients.push(guest_cx.foreground().spawn(guest.simulate(
-                    guest_username.clone(),
-                    op_start_signal.1,
-                    rng.clone(),
-                    guest_cx,
-                )));
-
-                log::info!("Added connection for {}", guest_username);
-                operations += 1;
-            }
-            20..=24 if clients.len() > 1 => {
-                let guest_ix = rng.lock().gen_range(1..clients.len());
-                log::info!(
-                    "Simulating full disconnection of guest {}",
-                    user_ids[guest_ix]
-                );
-                let removed_guest_id = user_ids.remove(guest_ix);
-                let user_connection_ids = server
-                    .connection_pool
-                    .lock()
-                    .user_connection_ids(removed_guest_id)
-                    .collect::<Vec<_>>();
-                assert_eq!(user_connection_ids.len(), 1);
-                let removed_peer_id = user_connection_ids[0].into();
-                let guest = clients.remove(guest_ix);
-                op_start_signals.remove(guest_ix);
-                server.forbid_connections();
-                server.disconnect_client(removed_peer_id);
-                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
-                deterministic.start_waiting();
-                log::info!("Waiting for guest {} to exit...", removed_guest_id);
-                let (guest, mut guest_cx) = guest.await;
-                deterministic.finish_waiting();
-                server.allow_connections();
-
-                for project in &guest.remote_projects {
-                    project.read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
-                }
-                for user_id in &user_ids {
-                    let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
-                    let pool = server.connection_pool.lock();
-                    for contact in contacts {
-                        if let db::Contact::Accepted { user_id, .. } = contact {
-                            if pool.is_user_online(user_id) {
-                                assert_ne!(
-                                    user_id, removed_guest_id,
-                                    "removed guest is still a contact of another peer"
-                                );
-                            }
-                        }
-                    }
-                }
-
-                log::info!("{} removed", guest.username);
-                available_guests.push((removed_guest_id, guest.username.clone()));
-                guest_cx.update(|cx| {
-                    cx.clear_globals();
-                    drop(guest);
-                });
-
-                operations += 1;
-            }
-            25..=29 if clients.len() > 1 => {
-                let guest_ix = rng.lock().gen_range(1..clients.len());
-                let user_id = user_ids[guest_ix];
-                log::info!("Simulating temporary disconnection of guest {}", user_id);
-                let user_connection_ids = server
-                    .connection_pool
-                    .lock()
-                    .user_connection_ids(user_id)
-                    .collect::<Vec<_>>();
-                assert_eq!(user_connection_ids.len(), 1);
-                let peer_id = user_connection_ids[0].into();
-                server.disconnect_client(peer_id);
-                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
-                operations += 1;
-            }
-            30..=34 => {
-                log::info!("Simulating server restart");
-                server.reset().await;
-                deterministic.advance_clock(RECEIVE_TIMEOUT);
-                server.start().await.unwrap();
-                deterministic.advance_clock(CLEANUP_TIMEOUT);
-                let environment = &server.app_state.config.zed_environment;
-                let stale_room_ids = server
-                    .app_state
-                    .db
-                    .stale_room_ids(environment, server.id())
-                    .await
-                    .unwrap();
-                assert_eq!(stale_room_ids, vec![]);
-            }
-            _ if !op_start_signals.is_empty() => {
-                while operations < max_operations && rng.lock().gen_bool(0.7) {
-                    op_start_signals
-                        .choose(&mut *rng.lock())
-                        .unwrap()
-                        .unbounded_send(())
-                        .unwrap();
-                    operations += 1;
-                }
-
-                if rng.lock().gen_bool(0.8) {
-                    deterministic.run_until_parked();
-                }
-            }
-            _ => {}
-        }
-    }
-
-    drop(op_start_signals);
-    deterministic.start_waiting();
-    let clients = futures::future::join_all(clients).await;
-    deterministic.finish_waiting();
-    deterministic.run_until_parked();
-
-    for (guest_client, guest_cx) in &clients {
-        for guest_project in &guest_client.remote_projects {
-            guest_project.read_with(guest_cx, |guest_project, cx| {
-                let host_project = clients.iter().find_map(|(client, cx)| {
-                    let project = client.local_projects.iter().find(|host_project| {
-                        host_project.read_with(cx, |host_project, _| {
-                            host_project.remote_id() == guest_project.remote_id()
-                        })
-                    })?;
-                    Some((project, cx))
-                });
-
-                if !guest_project.is_read_only() {
-                    if let Some((host_project, host_cx)) = host_project {
-                        let host_worktree_snapshots =
-                            host_project.read_with(host_cx, |host_project, cx| {
-                                host_project
-                                    .worktrees(cx)
-                                    .map(|worktree| {
-                                        let worktree = worktree.read(cx);
-                                        (worktree.id(), worktree.snapshot())
-                                    })
-                                    .collect::<BTreeMap<_, _>>()
-                            });
-                        let guest_worktree_snapshots = guest_project
-                            .worktrees(cx)
-                            .map(|worktree| {
-                                let worktree = worktree.read(cx);
-                                (worktree.id(), worktree.snapshot())
-                            })
-                            .collect::<BTreeMap<_, _>>();
-
-                        assert_eq!(
-                            guest_worktree_snapshots.keys().collect::<Vec<_>>(),
-                            host_worktree_snapshots.keys().collect::<Vec<_>>(),
-                            "{} has different worktrees than the host",
-                            guest_client.username
-                        );
-
-                        for (id, host_snapshot) in &host_worktree_snapshots {
-                            let guest_snapshot = &guest_worktree_snapshots[id];
-                            assert_eq!(
-                                guest_snapshot.root_name(),
-                                host_snapshot.root_name(),
-                                "{} has different root name than the host for worktree {}",
-                                guest_client.username,
-                                id
-                            );
-                            assert_eq!(
-                                guest_snapshot.abs_path(),
-                                host_snapshot.abs_path(),
-                                "{} has different abs path than the host for worktree {}",
-                                guest_client.username,
-                                id
-                            );
-                            assert_eq!(
-                                guest_snapshot.entries(false).collect::<Vec<_>>(),
-                                host_snapshot.entries(false).collect::<Vec<_>>(),
-                                "{} has different snapshot than the host for worktree {}",
-                                guest_client.username,
-                                id
-                            );
-                            assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
-                        }
-                    }
-                }
-
-                guest_project.check_invariants(cx);
-            });
-        }
-
-        for (guest_project, guest_buffers) in &guest_client.buffers {
-            let project_id = if guest_project.read_with(guest_cx, |project, _| {
-                project.is_local() || project.is_read_only()
-            }) {
-                continue;
-            } else {
-                guest_project
-                    .read_with(guest_cx, |project, _| project.remote_id())
-                    .unwrap()
-            };
-
-            let host_project = clients.iter().find_map(|(client, cx)| {
-                let project = client.local_projects.iter().find(|host_project| {
-                    host_project.read_with(cx, |host_project, _| {
-                        host_project.remote_id() == Some(project_id)
-                    })
-                })?;
-                Some((project, cx))
-            });
-
-            let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project {
-                (host_project, host_cx)
-            } else {
-                continue;
-            };
-
-            for guest_buffer in guest_buffers {
-                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
-                let host_buffer = host_project.read_with(host_cx, |project, cx| {
-                    project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
-                        panic!(
-                            "host does not have buffer for guest:{}, peer:{:?}, id:{}",
-                            guest_client.username,
-                            guest_client.peer_id(),
-                            buffer_id
-                        )
-                    })
-                });
-                let path = host_buffer
-                    .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
-
-                assert_eq!(
-                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()),
-                    0,
-                    "{}, buffer {}, path {:?} has deferred operations",
-                    guest_client.username,
-                    buffer_id,
-                    path,
-                );
-                assert_eq!(
-                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
-                    host_buffer.read_with(host_cx, |buffer, _| buffer.text()),
-                    "{}, buffer {}, path {:?}, differs from the host's buffer",
-                    guest_client.username,
-                    buffer_id,
-                    path
-                );
-            }
-        }
-    }
-
-    for (client, mut cx) in clients {
-        cx.update(|cx| {
-            cx.clear_globals();
-            drop(client);
-        });
-    }
-}
-
-struct TestServer {
-    app_state: Arc<AppState>,
-    server: Arc<Server>,
-    connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
-    forbid_connections: Arc<AtomicBool>,
-    _test_db: TestDb,
-    test_live_kit_server: Arc<live_kit_client::TestServer>,
-}
-
-impl TestServer {
-    async fn start(deterministic: &Arc<Deterministic>) -> Self {
-        static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
-
-        let use_postgres = env::var("USE_POSTGRES").ok();
-        let use_postgres = use_postgres.as_deref();
-        let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
-            TestDb::postgres(deterministic.build_background())
-        } else {
-            TestDb::sqlite(deterministic.build_background())
-        };
-        let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
-        let live_kit_server = live_kit_client::TestServer::create(
-            format!("http://livekit.{}.test", live_kit_server_id),
-            format!("devkey-{}", live_kit_server_id),
-            format!("secret-{}", live_kit_server_id),
-            deterministic.build_background(),
-        )
-        .unwrap();
-        let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
-        let epoch = app_state
-            .db
-            .create_server(&app_state.config.zed_environment)
-            .await
-            .unwrap();
-        let server = Server::new(
-            epoch,
-            app_state.clone(),
-            Executor::Deterministic(deterministic.build_background()),
-        );
-        server.start().await.unwrap();
-        // Advance clock to ensure the server's cleanup task is finished.
-        deterministic.advance_clock(CLEANUP_TIMEOUT);
-        Self {
-            app_state,
-            server,
-            connection_killers: Default::default(),
-            forbid_connections: Default::default(),
-            _test_db: test_db,
-            test_live_kit_server: live_kit_server,
-        }
-    }
-
-    async fn reset(&self) {
-        self.app_state.db.reset();
-        let epoch = self
-            .app_state
-            .db
-            .create_server(&self.app_state.config.zed_environment)
-            .await
-            .unwrap();
-        self.server.reset(epoch);
-    }
-
-    async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
-        cx.update(|cx| {
-            cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()));
-
-            let mut settings = Settings::test(cx);
-            settings.projects_online_by_default = false;
-            cx.set_global(settings);
-        });
-
-        let http = FakeHttpClient::with_404_response();
-        let user_id = if let Ok(Some(user)) = self
-            .app_state
-            .db
-            .get_user_by_github_account(name, None)
-            .await
-        {
-            user.id
-        } else {
-            self.app_state
-                .db
-                .create_user(
-                    &format!("{name}@example.com"),
-                    false,
-                    NewUserParams {
-                        github_login: name.into(),
-                        github_user_id: 0,
-                        invite_count: 0,
-                    },
-                )
-                .await
-                .expect("creating user failed")
-                .user_id
-        };
-        let client_name = name.to_string();
-        let mut client = cx.read(|cx| Client::new(http.clone(), cx));
-        let server = self.server.clone();
-        let db = self.app_state.db.clone();
-        let connection_killers = self.connection_killers.clone();
-        let forbid_connections = self.forbid_connections.clone();
-
-        Arc::get_mut(&mut client)
-            .unwrap()
-            .set_id(user_id.0 as usize)
-            .override_authenticate(move |cx| {
-                cx.spawn(|_| async move {
-                    let access_token = "the-token".to_string();
-                    Ok(Credentials {
-                        user_id: user_id.0 as u64,
-                        access_token,
-                    })
-                })
-            })
-            .override_establish_connection(move |credentials, cx| {
-                assert_eq!(credentials.user_id, user_id.0 as u64);
-                assert_eq!(credentials.access_token, "the-token");
-
-                let server = server.clone();
-                let db = db.clone();
-                let connection_killers = connection_killers.clone();
-                let forbid_connections = forbid_connections.clone();
-                let client_name = client_name.clone();
-                cx.spawn(move |cx| async move {
-                    if forbid_connections.load(SeqCst) {
-                        Err(EstablishConnectionError::other(anyhow!(
-                            "server is forbidding connections"
-                        )))
-                    } else {
-                        let (client_conn, server_conn, killed) =
-                            Connection::in_memory(cx.background());
-                        let (connection_id_tx, connection_id_rx) = oneshot::channel();
-                        let user = db
-                            .get_user_by_id(user_id)
-                            .await
-                            .expect("retrieving user failed")
-                            .unwrap();
-                        cx.background()
-                            .spawn(server.handle_connection(
-                                server_conn,
-                                client_name,
-                                user,
-                                Some(connection_id_tx),
-                                Executor::Deterministic(cx.background()),
-                            ))
-                            .detach();
-                        let connection_id = connection_id_rx.await.unwrap();
-                        connection_killers
-                            .lock()
-                            .insert(connection_id.into(), killed);
-                        Ok(client_conn)
-                    }
-                })
-            });
-
-        let fs = FakeFs::new(cx.background());
-        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
-        let app_state = Arc::new(workspace::AppState {
-            client: client.clone(),
-            user_store: user_store.clone(),
-            languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
-            themes: ThemeRegistry::new((), cx.font_cache()),
-            fs: fs.clone(),
-            build_window_options: Default::default,
-            initialize_workspace: |_, _, _| unimplemented!(),
-            dock_default_item_factory: |_, _| unimplemented!(),
-        });
-
-        Project::init(&client);
-        cx.update(|cx| {
-            workspace::init(app_state.clone(), cx);
-            call::init(client.clone(), user_store.clone(), cx);
-        });
-
-        client
-            .authenticate_and_connect(false, &cx.to_async())
-            .await
-            .unwrap();
-
-        let client = TestClient {
-            client,
-            username: name.to_string(),
-            local_projects: Default::default(),
-            remote_projects: Default::default(),
-            next_root_dir_id: 0,
-            user_store,
-            fs,
-            language_registry: Arc::new(LanguageRegistry::test()),
-            buffers: Default::default(),
-        };
-        client.wait_for_current_user(cx).await;
-        client
-    }
-
-    fn disconnect_client(&self, peer_id: PeerId) {
-        self.connection_killers
-            .lock()
-            .remove(&peer_id)
-            .unwrap()
-            .store(true, SeqCst);
-    }
-
-    fn forbid_connections(&self) {
-        self.forbid_connections.store(true, SeqCst);
-    }
-
-    fn allow_connections(&self) {
-        self.forbid_connections.store(false, SeqCst);
-    }
-
-    async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
-        for ix in 1..clients.len() {
-            let (left, right) = clients.split_at_mut(ix);
-            let (client_a, cx_a) = left.last_mut().unwrap();
-            for (client_b, cx_b) in right {
-                client_a
-                    .user_store
-                    .update(*cx_a, |store, cx| {
-                        store.request_contact(client_b.user_id().unwrap(), cx)
-                    })
-                    .await
-                    .unwrap();
-                cx_a.foreground().run_until_parked();
-                client_b
-                    .user_store
-                    .update(*cx_b, |store, cx| {
-                        store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
-                    })
-                    .await
-                    .unwrap();
-            }
-        }
-    }
-
-    async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
-        self.make_contacts(clients).await;
-
-        let (left, right) = clients.split_at_mut(1);
-        let (_client_a, cx_a) = &mut left[0];
-        let active_call_a = cx_a.read(ActiveCall::global);
-
-        for (client_b, cx_b) in right {
-            let user_id_b = client_b.current_user_id(*cx_b).to_proto();
-            active_call_a
-                .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
-                .await
-                .unwrap();
-
-            cx_b.foreground().run_until_parked();
-            let active_call_b = cx_b.read(ActiveCall::global);
-            active_call_b
-                .update(*cx_b, |call, cx| call.accept_incoming(cx))
-                .await
-                .unwrap();
-        }
-    }
-
-    async fn build_app_state(
-        test_db: &TestDb,
-        fake_server: &live_kit_client::TestServer,
-    ) -> Arc<AppState> {
-        Arc::new(AppState {
-            db: test_db.db().clone(),
-            live_kit_client: Some(Arc::new(fake_server.create_api_client())),
-            config: Default::default(),
-        })
-    }
-}
-
-impl Deref for TestServer {
-    type Target = Server;
-
-    fn deref(&self) -> &Self::Target {
-        &self.server
-    }
-}
-
-impl Drop for TestServer {
-    fn drop(&mut self) {
-        self.server.teardown();
-        self.test_live_kit_server.teardown().unwrap();
-    }
-}
-
-struct TestClient {
-    client: Arc<Client>,
-    username: String,
-    local_projects: Vec<ModelHandle<Project>>,
-    remote_projects: Vec<ModelHandle<Project>>,
-    next_root_dir_id: usize,
-    pub user_store: ModelHandle<UserStore>,
-    language_registry: Arc<LanguageRegistry>,
-    fs: Arc<FakeFs>,
-    buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
-}
-
-impl Deref for TestClient {
-    type Target = Arc<Client>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.client
-    }
-}
-
-struct ContactsSummary {
-    pub current: Vec<String>,
-    pub outgoing_requests: Vec<String>,
-    pub incoming_requests: Vec<String>,
-}
-
-impl TestClient {
-    pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
-        UserId::from_proto(
-            self.user_store
-                .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
-        )
-    }
-
-    async fn wait_for_current_user(&self, cx: &TestAppContext) {
-        let mut authed_user = self
-            .user_store
-            .read_with(cx, |user_store, _| user_store.watch_current_user());
-        while authed_user.next().await.unwrap().is_none() {}
-    }
-
-    async fn clear_contacts(&self, cx: &mut TestAppContext) {
-        self.user_store
-            .update(cx, |store, _| store.clear_contacts())
-            .await;
-    }
-
-    fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
-        self.user_store.read_with(cx, |store, _| ContactsSummary {
-            current: store
-                .contacts()
-                .iter()
-                .map(|contact| contact.user.github_login.clone())
-                .collect(),
-            outgoing_requests: store
-                .outgoing_contact_requests()
-                .iter()
-                .map(|user| user.github_login.clone())
-                .collect(),
-            incoming_requests: store
-                .incoming_contact_requests()
-                .iter()
-                .map(|user| user.github_login.clone())
-                .collect(),
-        })
-    }
-
-    async fn build_local_project(
-        &self,
-        root_path: impl AsRef<Path>,
-        cx: &mut TestAppContext,
-    ) -> (ModelHandle<Project>, WorktreeId) {
-        let project = cx.update(|cx| {
-            Project::local(
-                self.client.clone(),
-                self.user_store.clone(),
-                self.language_registry.clone(),
-                self.fs.clone(),
-                cx,
-            )
-        });
-        let (worktree, _) = project
-            .update(cx, |p, cx| {
-                p.find_or_create_local_worktree(root_path, true, cx)
-            })
-            .await
-            .unwrap();
-        worktree
-            .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
-            .await;
-        (project, worktree.read_with(cx, |tree, _| tree.id()))
-    }
-
-    async fn build_remote_project(
-        &self,
-        host_project_id: u64,
-        guest_cx: &mut TestAppContext,
-    ) -> ModelHandle<Project> {
-        let active_call = guest_cx.read(ActiveCall::global);
-        let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
-        room.update(guest_cx, |room, cx| {
-            room.join_project(
-                host_project_id,
-                self.language_registry.clone(),
-                self.fs.clone(),
-                cx,
-            )
-        })
-        .await
-        .unwrap()
-    }
-
-    fn build_workspace(
-        &self,
-        project: &ModelHandle<Project>,
-        cx: &mut TestAppContext,
-    ) -> ViewHandle<Workspace> {
-        let (_, root_view) = cx.add_window(|_| EmptyView);
-        cx.add_view(&root_view, |cx| {
-            Workspace::new(
-                Default::default(),
-                0,
-                project.clone(),
-                |_, _| unimplemented!(),
-                cx,
-            )
-        })
-    }
-
-    pub async fn simulate(
-        mut self,
-        username: String,
-        mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
-        rng: Arc<Mutex<StdRng>>,
-        mut cx: TestAppContext,
-    ) -> (Self, TestAppContext) {
-        async fn tick(
-            client: &mut TestClient,
-            username: &str,
-            rng: Arc<Mutex<StdRng>>,
-            cx: &mut TestAppContext,
-        ) -> anyhow::Result<()> {
-            let active_call = cx.read(ActiveCall::global);
-            if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
-                if rng.lock().gen() {
-                    log::info!("{}: accepting incoming call", username);
-                    active_call
-                        .update(cx, |call, cx| call.accept_incoming(cx))
-                        .await?;
-                } else {
-                    log::info!("{}: declining incoming call", username);
-                    active_call.update(cx, |call, _| call.decline_incoming())?;
-                }
-            } else {
-                let available_contacts = client.user_store.read_with(cx, |user_store, _| {
-                    user_store
-                        .contacts()
-                        .iter()
-                        .filter(|contact| contact.online && !contact.busy)
-                        .cloned()
-                        .collect::<Vec<_>>()
-                });
-
-                let distribution = rng.lock().gen_range(0..100);
-                match distribution {
-                    0..=29 if !available_contacts.is_empty() => {
-                        let contact = available_contacts.choose(&mut *rng.lock()).unwrap();
-                        log::info!("{}: inviting {}", username, contact.user.github_login);
-                        active_call
-                            .update(cx, |call, cx| call.invite(contact.user.id, None, cx))
-                            .await?;
-                    }
-                    30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => {
-                        log::info!("{}: hanging up", username);
-                        active_call.update(cx, |call, cx| call.hang_up(cx))?;
-                    }
-                    _ => {}
-                }
-            }
-
-            let remote_projects =
-                if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) {
-                    room.read_with(cx, |room, _| {
-                        room.remote_participants()
-                            .values()
-                            .flat_map(|participant| participant.projects.clone())
-                            .collect::<Vec<_>>()
-                    })
-                } else {
-                    Default::default()
-                };
-            let project = if remote_projects.is_empty() || rng.lock().gen() {
-                if client.local_projects.is_empty() || rng.lock().gen() {
-                    let dir_paths = client.fs.directories().await;
-                    let local_project = if dir_paths.is_empty() || rng.lock().gen() {
-                        let root_path = format!(
-                            "/{}-root-{}",
-                            username,
-                            post_inc(&mut client.next_root_dir_id)
-                        );
-                        let root_path = Path::new(&root_path);
-                        client.fs.create_dir(root_path).await.unwrap();
-                        client
-                            .fs
-                            .create_file(&root_path.join("main.rs"), Default::default())
-                            .await
-                            .unwrap();
-                        log::info!("{}: opening local project at {:?}", username, root_path);
-                        client.build_local_project(root_path, cx).await.0
-                    } else {
-                        let root_path = dir_paths.choose(&mut *rng.lock()).unwrap();
-                        log::info!("{}: opening local project at {:?}", username, root_path);
-                        client.build_local_project(root_path, cx).await.0
-                    };
-                    client.local_projects.push(local_project.clone());
-                    local_project
-                } else {
-                    client
-                        .local_projects
-                        .choose(&mut *rng.lock())
-                        .unwrap()
-                        .clone()
-                }
-            } else {
-                if client.remote_projects.is_empty() || rng.lock().gen() {
-                    let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id;
-                    let remote_project = if let Some(project) =
-                        client.remote_projects.iter().find(|project| {
-                            project.read_with(cx, |project, _| {
-                                project.remote_id() == Some(remote_project_id)
-                            })
-                        }) {
-                        project.clone()
-                    } else {
-                        log::info!("{}: opening remote project {}", username, remote_project_id);
-                        let remote_project = Project::remote(
-                            remote_project_id,
-                            client.client.clone(),
-                            client.user_store.clone(),
-                            client.language_registry.clone(),
-                            FakeFs::new(cx.background()),
-                            cx.to_async(),
-                        )
-                        .await?;
-                        client.remote_projects.push(remote_project.clone());
-                        remote_project
-                    };
-
-                    remote_project
-                } else {
-                    client
-                        .remote_projects
-                        .choose(&mut *rng.lock())
-                        .unwrap()
-                        .clone()
-                }
-            };
-
-            if active_call.read_with(cx, |call, _| call.room().is_some()) {
-                if let Err(error) = active_call
-                    .update(cx, |call, cx| call.share_project(project.clone(), cx))
-                    .await
-                {
-                    log::error!("{}: error sharing project, {:?}", username, error);
-                }
-            }
-
-            let buffers = client.buffers.entry(project.clone()).or_default();
-            let buffer = if buffers.is_empty() || rng.lock().gen() {
-                let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| {
-                    project
-                        .worktrees(cx)
-                        .filter(|worktree| {
-                            let worktree = worktree.read(cx);
-                            worktree.is_visible() && worktree.entries(false).any(|e| e.is_file())
-                        })
-                        .choose(&mut *rng.lock())
-                }) {
-                    worktree
-                } else {
-                    cx.background().simulate_random_delay().await;
-                    return Ok(());
-                };
-
-                let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| {
-                    let entry = worktree
-                        .entries(false)
-                        .filter(|e| e.is_file())
-                        .choose(&mut *rng.lock())
-                        .unwrap();
-                    (
-                        worktree.root_name().to_string(),
-                        (worktree.id(), entry.path.clone()),
-                    )
-                });
-                log::info!(
-                    "{}: opening path {:?} in worktree {} ({})",
-                    username,
-                    project_path.1,
-                    project_path.0,
-                    worktree_root_name,
-                );
-                let buffer = project
-                    .update(cx, |project, cx| {
-                        project.open_buffer(project_path.clone(), cx)
-                    })
-                    .await?;
-                log::info!(
-                    "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
-                    username,
-                    project_path.1,
-                    project_path.0,
-                    worktree_root_name,
-                    buffer.read_with(cx, |buffer, _| buffer.remote_id())
-                );
-                buffers.insert(buffer.clone());
-                buffer
-            } else {
-                buffers.iter().choose(&mut *rng.lock()).unwrap().clone()
-            };
-
-            let choice = rng.lock().gen_range(0..100);
-            match choice {
-                0..=9 => {
-                    cx.update(|cx| {
-                        log::info!(
-                            "{}: dropping buffer {:?}",
-                            username,
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        buffers.remove(&buffer);
-                        drop(buffer);
-                    });
-                }
-                10..=19 => {
-                    let completions = project.update(cx, |project, cx| {
-                        log::info!(
-                            "{}: requesting completions for buffer {} ({:?})",
-                            username,
-                            buffer.read(cx).remote_id(),
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                        project.completions(&buffer, offset, cx)
-                    });
-                    let completions = cx.background().spawn(async move {
-                        completions
-                            .await
-                            .map_err(|err| anyhow!("completions request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching completions request", username);
-                        cx.update(|cx| completions.detach_and_log_err(cx));
-                    } else {
-                        completions.await?;
-                    }
-                }
-                20..=29 => {
-                    let code_actions = project.update(cx, |project, cx| {
-                        log::info!(
-                            "{}: requesting code actions for buffer {} ({:?})",
-                            username,
-                            buffer.read(cx).remote_id(),
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
-                        project.code_actions(&buffer, range, cx)
-                    });
-                    let code_actions = cx.background().spawn(async move {
-                        code_actions
-                            .await
-                            .map_err(|err| anyhow!("code actions request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching code actions request", username);
-                        cx.update(|cx| code_actions.detach_and_log_err(cx));
-                    } else {
-                        code_actions.await?;
-                    }
-                }
-                30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
-                    let (requested_version, save) = buffer.update(cx, |buffer, cx| {
-                        log::info!(
-                            "{}: saving buffer {} ({:?})",
-                            username,
-                            buffer.remote_id(),
-                            buffer.file().unwrap().full_path(cx)
-                        );
-                        (buffer.version(), buffer.save(cx))
-                    });
-                    let save = cx.background().spawn(async move {
-                        let (saved_version, _, _) = save
-                            .await
-                            .map_err(|err| anyhow!("save request failed: {:?}", err))?;
-                        assert!(saved_version.observed_all(&requested_version));
-                        Ok::<_, anyhow::Error>(())
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching save request", username);
-                        cx.update(|cx| save.detach_and_log_err(cx));
-                    } else {
-                        save.await?;
-                    }
-                }
-                40..=44 => {
-                    let prepare_rename = project.update(cx, |project, cx| {
-                        log::info!(
-                            "{}: preparing rename for buffer {} ({:?})",
-                            username,
-                            buffer.read(cx).remote_id(),
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                        project.prepare_rename(buffer, offset, cx)
-                    });
-                    let prepare_rename = cx.background().spawn(async move {
-                        prepare_rename
-                            .await
-                            .map_err(|err| anyhow!("prepare rename request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching prepare rename request", username);
-                        cx.update(|cx| prepare_rename.detach_and_log_err(cx));
-                    } else {
-                        prepare_rename.await?;
-                    }
-                }
-                45..=49 => {
-                    let definitions = project.update(cx, |project, cx| {
-                        log::info!(
-                            "{}: requesting definitions for buffer {} ({:?})",
-                            username,
-                            buffer.read(cx).remote_id(),
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                        project.definition(&buffer, offset, cx)
-                    });
-                    let definitions = cx.background().spawn(async move {
-                        definitions
-                            .await
-                            .map_err(|err| anyhow!("definitions request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching definitions request", username);
-                        cx.update(|cx| definitions.detach_and_log_err(cx));
-                    } else {
-                        buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer));
-                    }
-                }
-                50..=54 => {
-                    let highlights = project.update(cx, |project, cx| {
-                        log::info!(
-                            "{}: requesting highlights for buffer {} ({:?})",
-                            username,
-                            buffer.read(cx).remote_id(),
-                            buffer.read(cx).file().unwrap().full_path(cx)
-                        );
-                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                        project.document_highlights(&buffer, offset, cx)
-                    });
-                    let highlights = cx.background().spawn(async move {
-                        highlights
-                            .await
-                            .map_err(|err| anyhow!("highlights request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching highlights request", username);
-                        cx.update(|cx| highlights.detach_and_log_err(cx));
-                    } else {
-                        highlights.await?;
-                    }
-                }
-                55..=59 => {
-                    let search = project.update(cx, |project, cx| {
-                        let query = rng.lock().gen_range('a'..='z');
-                        log::info!("{}: project-wide search {:?}", username, query);
-                        project.search(SearchQuery::text(query, false, false), cx)
-                    });
-                    let search = cx.background().spawn(async move {
-                        search
-                            .await
-                            .map_err(|err| anyhow!("search request failed: {:?}", err))
-                    });
-                    if rng.lock().gen_bool(0.3) {
-                        log::info!("{}: detaching search request", username);
-                        cx.update(|cx| search.detach_and_log_err(cx));
-                    } else {
-                        buffers.extend(search.await?.into_keys());
-                    }
-                }
-                60..=79 => {
-                    let worktree = project
-                        .read_with(cx, |project, cx| {
-                            project
-                                .worktrees(cx)
-                                .filter(|worktree| {
-                                    let worktree = worktree.read(cx);
-                                    worktree.is_visible()
-                                        && worktree.entries(false).any(|e| e.is_file())
-                                        && worktree.root_entry().map_or(false, |e| e.is_dir())
-                                })
-                                .choose(&mut *rng.lock())
-                        })
-                        .unwrap();
-                    let (worktree_id, worktree_root_name) = worktree
-                        .read_with(cx, |worktree, _| {
-                            (worktree.id(), worktree.root_name().to_string())
-                        });
-
-                    let mut new_name = String::new();
-                    for _ in 0..10 {
-                        let letter = rng.lock().gen_range('a'..='z');
-                        new_name.push(letter);
-                    }
-
-                    let is_dir = rng.lock().gen::<bool>();
-                    let mut new_path = PathBuf::new();
-                    new_path.push(new_name);
-                    if !is_dir {
-                        new_path.set_extension("rs");
-                    }
-                    log::info!(
-                        "{}: creating {:?} in worktree {} ({})",
-                        username,
-                        new_path,
-                        worktree_id,
-                        worktree_root_name,
-                    );
-                    project
-                        .update(cx, |project, cx| {
-                            project.create_entry((worktree_id, new_path), is_dir, cx)
-                        })
-                        .unwrap()
-                        .await?;
-                }
-                _ => {
-                    buffer.update(cx, |buffer, cx| {
-                        log::info!(
-                            "{}: updating buffer {} ({:?})",
-                            username,
-                            buffer.remote_id(),
-                            buffer.file().unwrap().full_path(cx)
-                        );
-                        if rng.lock().gen_bool(0.7) {
-                            buffer.randomly_edit(&mut *rng.lock(), 5, cx);
-                        } else {
-                            buffer.randomly_undo_redo(&mut *rng.lock(), cx);
-                        }
-                    });
-                }
-            }
-
-            Ok(())
-        }
-
-        // Setup language server
-        let mut language = Language::new(
-            LanguageConfig {
-                name: "Rust".into(),
-                path_suffixes: vec!["rs".to_string()],
-                ..Default::default()
-            },
-            None,
-        );
-        let _fake_language_servers = language
-            .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
-                name: "the-fake-language-server",
-                capabilities: lsp::LanguageServer::full_capabilities(),
-                initializer: Some(Box::new({
-                    let rng = rng.clone();
-                    let fs = self.fs.clone();
-                    move |fake_server: &mut FakeLanguageServer| {
-                        fake_server.handle_request::<lsp::request::Completion, _, _>(
-                            |_, _| async move {
-                                Ok(Some(lsp::CompletionResponse::Array(vec![
-                                    lsp::CompletionItem {
-                                        text_edit: Some(lsp::CompletionTextEdit::Edit(
-                                            lsp::TextEdit {
-                                                range: lsp::Range::new(
-                                                    lsp::Position::new(0, 0),
-                                                    lsp::Position::new(0, 0),
-                                                ),
-                                                new_text: "the-new-text".to_string(),
-                                            },
-                                        )),
-                                        ..Default::default()
-                                    },
-                                ])))
-                            },
-                        );
-
-                        fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
-                            |_, _| async move {
-                                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
-                                    lsp::CodeAction {
-                                        title: "the-code-action".to_string(),
-                                        ..Default::default()
-                                    },
-                                )]))
-                            },
-                        );
-
-                        fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
-                            |params, _| async move {
-                                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
-                                    params.position,
-                                    params.position,
-                                ))))
-                            },
-                        );
-
-                        fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
-                            let fs = fs.clone();
-                            let rng = rng.clone();
-                            move |_, _| {
-                                let fs = fs.clone();
-                                let rng = rng.clone();
-                                async move {
-                                    let files = fs.files().await;
-                                    let mut rng = rng.lock();
-                                    let count = rng.gen_range::<usize, _>(1..3);
-                                    let files = (0..count)
-                                        .map(|_| files.choose(&mut *rng).unwrap())
-                                        .collect::<Vec<_>>();
-                                    log::info!("LSP: Returning definitions in files {:?}", &files);
-                                    Ok(Some(lsp::GotoDefinitionResponse::Array(
-                                        files
-                                            .into_iter()
-                                            .map(|file| lsp::Location {
-                                                uri: lsp::Url::from_file_path(file).unwrap(),
-                                                range: Default::default(),
-                                            })
-                                            .collect(),
-                                    )))
-                                }
-                            }
-                        });
-
-                        fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
-                            {
-                                let rng = rng.clone();
-                                move |_, _| {
-                                    let mut highlights = Vec::new();
-                                    let highlight_count = rng.lock().gen_range(1..=5);
-                                    for _ in 0..highlight_count {
-                                        let start_row = rng.lock().gen_range(0..100);
-                                        let start_column = rng.lock().gen_range(0..100);
-                                        let start = PointUtf16::new(start_row, start_column);
-                                        let end_row = rng.lock().gen_range(0..100);
-                                        let end_column = rng.lock().gen_range(0..100);
-                                        let end = PointUtf16::new(end_row, end_column);
-                                        let range =
-                                            if start > end { end..start } else { start..end };
-                                        highlights.push(lsp::DocumentHighlight {
-                                            range: range_to_lsp(range.clone()),
-                                            kind: Some(lsp::DocumentHighlightKind::READ),
-                                        });
-                                    }
-                                    highlights.sort_unstable_by_key(|highlight| {
-                                        (highlight.range.start, highlight.range.end)
-                                    });
-                                    async move { Ok(Some(highlights)) }
-                                }
-                            },
-                        );
-                    }
-                })),
-                ..Default::default()
-            }))
-            .await;
-        self.language_registry.add(Arc::new(language));
-
-        while op_start_signal.next().await.is_some() {
-            if let Err(error) = tick(&mut self, &username, rng.clone(), &mut cx).await {
-                log::error!("{} error: {:?}", username, error);
-            }
-
-            cx.background().simulate_random_delay().await;
-        }
-        log::info!("{}: done", username);
-
-        (self, cx)
-    }
-}
-
-impl Drop for TestClient {
-    fn drop(&mut self) {
-        self.client.teardown();
-    }
-}
-
 #[derive(Debug, Eq, PartialEq)]
 struct RoomParticipants {
     remote: Vec<String>,

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -0,0 +1,919 @@
+use crate::{
+    db::{self, NewUserParams},
+    rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
+    tests::{TestClient, TestServer},
+};
+use anyhow::anyhow;
+use call::ActiveCall;
+use client::RECEIVE_TIMEOUT;
+use collections::BTreeMap;
+use fs::{FakeFs, Fs as _};
+use futures::StreamExt as _;
+use gpui::{executor::Deterministic, TestAppContext};
+use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16};
+use lsp::FakeLanguageServer;
+use parking_lot::Mutex;
+use project::{search::SearchQuery, Project};
+use rand::prelude::*;
+use std::{env, path::PathBuf, sync::Arc};
+
+#[gpui::test(iterations = 100)]
+async fn test_random_collaboration(
+    cx: &mut TestAppContext,
+    deterministic: Arc<Deterministic>,
+    rng: StdRng,
+) {
+    deterministic.forbid_parking();
+    let rng = Arc::new(Mutex::new(rng));
+
+    let max_peers = env::var("MAX_PEERS")
+        .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
+        .unwrap_or(5);
+
+    let max_operations = env::var("OPERATIONS")
+        .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
+        .unwrap_or(10);
+
+    let mut server = TestServer::start(&deterministic).await;
+    let db = server.app_state.db.clone();
+
+    let mut available_guests = Vec::new();
+    for ix in 0..max_peers {
+        let username = format!("guest-{}", ix + 1);
+        let user_id = db
+            .create_user(
+                &format!("{username}@example.com"),
+                false,
+                NewUserParams {
+                    github_login: username.clone(),
+                    github_user_id: (ix + 1) as i32,
+                    invite_count: 0,
+                },
+            )
+            .await
+            .unwrap()
+            .user_id;
+        available_guests.push((user_id, username));
+    }
+
+    for (ix, (user_id_a, _)) in available_guests.iter().enumerate() {
+        for (user_id_b, _) in &available_guests[ix + 1..] {
+            server
+                .app_state
+                .db
+                .send_contact_request(*user_id_a, *user_id_b)
+                .await
+                .unwrap();
+            server
+                .app_state
+                .db
+                .respond_to_contact_request(*user_id_b, *user_id_a, true)
+                .await
+                .unwrap();
+        }
+    }
+
+    let mut clients = Vec::new();
+    let mut user_ids = Vec::new();
+    let mut op_start_signals = Vec::new();
+    let mut next_entity_id = 100000;
+
+    let mut operations = 0;
+    while operations < max_operations {
+        let distribution = rng.lock().gen_range(0..100);
+        match distribution {
+            0..=19 if !available_guests.is_empty() => {
+                let guest_ix = rng.lock().gen_range(0..available_guests.len());
+                let (_, guest_username) = available_guests.remove(guest_ix);
+                log::info!("Adding new connection for {}", guest_username);
+                next_entity_id += 100000;
+                let mut guest_cx = TestAppContext::new(
+                    cx.foreground_platform(),
+                    cx.platform(),
+                    deterministic.build_foreground(next_entity_id),
+                    deterministic.build_background(),
+                    cx.font_cache(),
+                    cx.leak_detector(),
+                    next_entity_id,
+                    cx.function_name.clone(),
+                );
+
+                let op_start_signal = futures::channel::mpsc::unbounded();
+                let guest = server.create_client(&mut guest_cx, &guest_username).await;
+                user_ids.push(guest.current_user_id(&guest_cx));
+                op_start_signals.push(op_start_signal.0);
+                clients.push(guest_cx.foreground().spawn(simulate_client(
+                    guest,
+                    op_start_signal.1,
+                    rng.clone(),
+                    guest_cx,
+                )));
+
+                log::info!("Added connection for {}", guest_username);
+                operations += 1;
+            }
+            20..=24 if clients.len() > 1 => {
+                let guest_ix = rng.lock().gen_range(1..clients.len());
+                log::info!(
+                    "Simulating full disconnection of guest {}",
+                    user_ids[guest_ix]
+                );
+                let removed_guest_id = user_ids.remove(guest_ix);
+                let user_connection_ids = server
+                    .connection_pool
+                    .lock()
+                    .user_connection_ids(removed_guest_id)
+                    .collect::<Vec<_>>();
+                assert_eq!(user_connection_ids.len(), 1);
+                let removed_peer_id = user_connection_ids[0].into();
+                let guest = clients.remove(guest_ix);
+                op_start_signals.remove(guest_ix);
+                server.forbid_connections();
+                server.disconnect_client(removed_peer_id);
+                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+                deterministic.start_waiting();
+                log::info!("Waiting for guest {} to exit...", removed_guest_id);
+                let (guest, mut guest_cx) = guest.await;
+                deterministic.finish_waiting();
+                server.allow_connections();
+
+                for project in &guest.remote_projects {
+                    project.read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
+                }
+                for user_id in &user_ids {
+                    let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
+                    let pool = server.connection_pool.lock();
+                    for contact in contacts {
+                        if let db::Contact::Accepted { user_id, .. } = contact {
+                            if pool.is_user_online(user_id) {
+                                assert_ne!(
+                                    user_id, removed_guest_id,
+                                    "removed guest is still a contact of another peer"
+                                );
+                            }
+                        }
+                    }
+                }
+
+                log::info!("{} removed", guest.username);
+                available_guests.push((removed_guest_id, guest.username.clone()));
+                guest_cx.update(|cx| {
+                    cx.clear_globals();
+                    drop(guest);
+                });
+
+                operations += 1;
+            }
+            25..=29 if clients.len() > 1 => {
+                let guest_ix = rng.lock().gen_range(1..clients.len());
+                let user_id = user_ids[guest_ix];
+                log::info!("Simulating temporary disconnection of guest {}", user_id);
+                let user_connection_ids = server
+                    .connection_pool
+                    .lock()
+                    .user_connection_ids(user_id)
+                    .collect::<Vec<_>>();
+                assert_eq!(user_connection_ids.len(), 1);
+                let peer_id = user_connection_ids[0].into();
+                server.disconnect_client(peer_id);
+                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+                operations += 1;
+            }
+            30..=34 => {
+                log::info!("Simulating server restart");
+                server.reset().await;
+                deterministic.advance_clock(RECEIVE_TIMEOUT);
+                server.start().await.unwrap();
+                deterministic.advance_clock(CLEANUP_TIMEOUT);
+                let environment = &server.app_state.config.zed_environment;
+                let stale_room_ids = server
+                    .app_state
+                    .db
+                    .stale_room_ids(environment, server.id())
+                    .await
+                    .unwrap();
+                assert_eq!(stale_room_ids, vec![]);
+            }
+            _ if !op_start_signals.is_empty() => {
+                while operations < max_operations && rng.lock().gen_bool(0.7) {
+                    op_start_signals
+                        .choose(&mut *rng.lock())
+                        .unwrap()
+                        .unbounded_send(())
+                        .unwrap();
+                    operations += 1;
+                }
+
+                if rng.lock().gen_bool(0.8) {
+                    deterministic.run_until_parked();
+                }
+            }
+            _ => {}
+        }
+    }
+
+    drop(op_start_signals);
+    deterministic.start_waiting();
+    let clients = futures::future::join_all(clients).await;
+    deterministic.finish_waiting();
+    deterministic.run_until_parked();
+
+    for (guest_client, guest_cx) in &clients {
+        for guest_project in &guest_client.remote_projects {
+            guest_project.read_with(guest_cx, |guest_project, cx| {
+                let host_project = clients.iter().find_map(|(client, cx)| {
+                    let project = client.local_projects.iter().find(|host_project| {
+                        host_project.read_with(cx, |host_project, _| {
+                            host_project.remote_id() == guest_project.remote_id()
+                        })
+                    })?;
+                    Some((project, cx))
+                });
+
+                if !guest_project.is_read_only() {
+                    if let Some((host_project, host_cx)) = host_project {
+                        let host_worktree_snapshots =
+                            host_project.read_with(host_cx, |host_project, cx| {
+                                host_project
+                                    .worktrees(cx)
+                                    .map(|worktree| {
+                                        let worktree = worktree.read(cx);
+                                        (worktree.id(), worktree.snapshot())
+                                    })
+                                    .collect::<BTreeMap<_, _>>()
+                            });
+                        let guest_worktree_snapshots = guest_project
+                            .worktrees(cx)
+                            .map(|worktree| {
+                                let worktree = worktree.read(cx);
+                                (worktree.id(), worktree.snapshot())
+                            })
+                            .collect::<BTreeMap<_, _>>();
+
+                        assert_eq!(
+                            guest_worktree_snapshots.keys().collect::<Vec<_>>(),
+                            host_worktree_snapshots.keys().collect::<Vec<_>>(),
+                            "{} has different worktrees than the host",
+                            guest_client.username
+                        );
+
+                        for (id, host_snapshot) in &host_worktree_snapshots {
+                            let guest_snapshot = &guest_worktree_snapshots[id];
+                            assert_eq!(
+                                guest_snapshot.root_name(),
+                                host_snapshot.root_name(),
+                                "{} has different root name than the host for worktree {}",
+                                guest_client.username,
+                                id
+                            );
+                            assert_eq!(
+                                guest_snapshot.abs_path(),
+                                host_snapshot.abs_path(),
+                                "{} has different abs path than the host for worktree {}",
+                                guest_client.username,
+                                id
+                            );
+                            assert_eq!(
+                                guest_snapshot.entries(false).collect::<Vec<_>>(),
+                                host_snapshot.entries(false).collect::<Vec<_>>(),
+                                "{} has different snapshot than the host for worktree {}",
+                                guest_client.username,
+                                id
+                            );
+                            assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
+                        }
+                    }
+                }
+
+                guest_project.check_invariants(cx);
+            });
+        }
+
+        for (guest_project, guest_buffers) in &guest_client.buffers {
+            let project_id = if guest_project.read_with(guest_cx, |project, _| {
+                project.is_local() || project.is_read_only()
+            }) {
+                continue;
+            } else {
+                guest_project
+                    .read_with(guest_cx, |project, _| project.remote_id())
+                    .unwrap()
+            };
+
+            let host_project = clients.iter().find_map(|(client, cx)| {
+                let project = client.local_projects.iter().find(|host_project| {
+                    host_project.read_with(cx, |host_project, _| {
+                        host_project.remote_id() == Some(project_id)
+                    })
+                })?;
+                Some((project, cx))
+            });
+
+            let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project {
+                (host_project, host_cx)
+            } else {
+                continue;
+            };
+
+            for guest_buffer in guest_buffers {
+                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
+                let host_buffer = host_project.read_with(host_cx, |project, cx| {
+                    project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
+                        panic!(
+                            "host does not have buffer for guest:{}, peer:{:?}, id:{}",
+                            guest_client.username,
+                            guest_client.peer_id(),
+                            buffer_id
+                        )
+                    })
+                });
+                let path = host_buffer
+                    .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
+
+                assert_eq!(
+                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()),
+                    0,
+                    "{}, buffer {}, path {:?} has deferred operations",
+                    guest_client.username,
+                    buffer_id,
+                    path,
+                );
+                assert_eq!(
+                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
+                    host_buffer.read_with(host_cx, |buffer, _| buffer.text()),
+                    "{}, buffer {}, path {:?}, differs from the host's buffer",
+                    guest_client.username,
+                    buffer_id,
+                    path
+                );
+            }
+        }
+    }
+
+    for (client, mut cx) in clients {
+        cx.update(|cx| {
+            cx.clear_globals();
+            drop(client);
+        });
+    }
+}
+
+async fn simulate_client(
+    mut client: TestClient,
+    mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
+    rng: Arc<Mutex<StdRng>>,
+    mut cx: TestAppContext,
+) -> (TestClient, TestAppContext) {
+    // Setup language server
+    let mut language = Language::new(
+        LanguageConfig {
+            name: "Rust".into(),
+            path_suffixes: vec!["rs".to_string()],
+            ..Default::default()
+        },
+        None,
+    );
+    let _fake_language_servers = language
+        .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
+            name: "the-fake-language-server",
+            capabilities: lsp::LanguageServer::full_capabilities(),
+            initializer: Some(Box::new({
+                let rng = rng.clone();
+                let fs = client.fs.clone();
+                move |fake_server: &mut FakeLanguageServer| {
+                    fake_server.handle_request::<lsp::request::Completion, _, _>(
+                        |_, _| async move {
+                            Ok(Some(lsp::CompletionResponse::Array(vec![
+                                lsp::CompletionItem {
+                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
+                                        range: lsp::Range::new(
+                                            lsp::Position::new(0, 0),
+                                            lsp::Position::new(0, 0),
+                                        ),
+                                        new_text: "the-new-text".to_string(),
+                                    })),
+                                    ..Default::default()
+                                },
+                            ])))
+                        },
+                    );
+
+                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
+                        |_, _| async move {
+                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
+                                lsp::CodeAction {
+                                    title: "the-code-action".to_string(),
+                                    ..Default::default()
+                                },
+                            )]))
+                        },
+                    );
+
+                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
+                        |params, _| async move {
+                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                                params.position,
+                                params.position,
+                            ))))
+                        },
+                    );
+
+                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
+                        let fs = fs.clone();
+                        let rng = rng.clone();
+                        move |_, _| {
+                            let fs = fs.clone();
+                            let rng = rng.clone();
+                            async move {
+                                let files = fs.files().await;
+                                let mut rng = rng.lock();
+                                let count = rng.gen_range::<usize, _>(1..3);
+                                let files = (0..count)
+                                    .map(|_| files.choose(&mut *rng).unwrap())
+                                    .collect::<Vec<_>>();
+                                log::info!("LSP: Returning definitions in files {:?}", &files);
+                                Ok(Some(lsp::GotoDefinitionResponse::Array(
+                                    files
+                                        .into_iter()
+                                        .map(|file| lsp::Location {
+                                            uri: lsp::Url::from_file_path(file).unwrap(),
+                                            range: Default::default(),
+                                        })
+                                        .collect(),
+                                )))
+                            }
+                        }
+                    });
+
+                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
+                        let rng = rng.clone();
+                        move |_, _| {
+                            let mut highlights = Vec::new();
+                            let highlight_count = rng.lock().gen_range(1..=5);
+                            for _ in 0..highlight_count {
+                                let start_row = rng.lock().gen_range(0..100);
+                                let start_column = rng.lock().gen_range(0..100);
+                                let start = PointUtf16::new(start_row, start_column);
+                                let end_row = rng.lock().gen_range(0..100);
+                                let end_column = rng.lock().gen_range(0..100);
+                                let end = PointUtf16::new(end_row, end_column);
+                                let range = if start > end { end..start } else { start..end };
+                                highlights.push(lsp::DocumentHighlight {
+                                    range: range_to_lsp(range.clone()),
+                                    kind: Some(lsp::DocumentHighlightKind::READ),
+                                });
+                            }
+                            highlights.sort_unstable_by_key(|highlight| {
+                                (highlight.range.start, highlight.range.end)
+                            });
+                            async move { Ok(Some(highlights)) }
+                        }
+                    });
+                }
+            })),
+            ..Default::default()
+        }))
+        .await;
+    client.language_registry.add(Arc::new(language));
+
+    while op_start_signal.next().await.is_some() {
+        if let Err(error) = randomly_mutate_client(&mut client, rng.clone(), &mut cx).await {
+            log::error!("{} error: {:?}", client.username, error);
+        }
+
+        cx.background().simulate_random_delay().await;
+    }
+    log::info!("{}: done", client.username);
+
+    (client, cx)
+}
+
+async fn randomly_mutate_client(
+    client: &mut TestClient,
+    rng: Arc<Mutex<StdRng>>,
+    cx: &mut TestAppContext,
+) -> anyhow::Result<()> {
+    let active_call = cx.read(ActiveCall::global);
+    if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
+        if rng.lock().gen() {
+            log::info!("{}: accepting incoming call", client.username);
+            active_call
+                .update(cx, |call, cx| call.accept_incoming(cx))
+                .await?;
+        } else {
+            log::info!("{}: declining incoming call", client.username);
+            active_call.update(cx, |call, _| call.decline_incoming())?;
+        }
+    } else {
+        let available_contacts = client.user_store.read_with(cx, |user_store, _| {
+            user_store
+                .contacts()
+                .iter()
+                .filter(|contact| contact.online && !contact.busy)
+                .cloned()
+                .collect::<Vec<_>>()
+        });
+
+        let distribution = rng.lock().gen_range(0..100);
+        match distribution {
+            0..=29 if !available_contacts.is_empty() => {
+                let contact = available_contacts.choose(&mut *rng.lock()).unwrap();
+                log::info!(
+                    "{}: inviting {}",
+                    client.username,
+                    contact.user.github_login
+                );
+                active_call
+                    .update(cx, |call, cx| call.invite(contact.user.id, None, cx))
+                    .await?;
+            }
+            30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => {
+                log::info!("{}: hanging up", client.username);
+                active_call.update(cx, |call, cx| call.hang_up(cx))?;
+            }
+            _ => {}
+        }
+    }
+
+    let remote_projects =
+        if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) {
+            room.read_with(cx, |room, _| {
+                room.remote_participants()
+                    .values()
+                    .flat_map(|participant| participant.projects.clone())
+                    .collect::<Vec<_>>()
+            })
+        } else {
+            Default::default()
+        };
+
+    let project = if remote_projects.is_empty() || rng.lock().gen() {
+        if client.local_projects.is_empty() || rng.lock().gen() {
+            let dir_paths = client.fs.directories().await;
+            let local_project = if dir_paths.is_empty() || rng.lock().gen() {
+                let root_path = client.create_new_root_dir();
+                client.fs.create_dir(&root_path).await.unwrap();
+                client
+                    .fs
+                    .create_file(&root_path.join("main.rs"), Default::default())
+                    .await
+                    .unwrap();
+                log::info!(
+                    "{}: opening local project at {:?}",
+                    client.username,
+                    root_path
+                );
+                client.build_local_project(root_path, cx).await.0
+            } else {
+                let root_path = dir_paths.choose(&mut *rng.lock()).unwrap();
+                log::info!(
+                    "{}: opening local project at {:?}",
+                    client.username,
+                    root_path
+                );
+                client.build_local_project(root_path, cx).await.0
+            };
+            client.local_projects.push(local_project.clone());
+            local_project
+        } else {
+            client
+                .local_projects
+                .choose(&mut *rng.lock())
+                .unwrap()
+                .clone()
+        }
+    } else {
+        if client.remote_projects.is_empty() || rng.lock().gen() {
+            let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id;
+            let remote_project = if let Some(project) =
+                client.remote_projects.iter().find(|project| {
+                    project.read_with(cx, |project, _| {
+                        project.remote_id() == Some(remote_project_id)
+                    })
+                }) {
+                project.clone()
+            } else {
+                log::info!(
+                    "{}: opening remote project {}",
+                    client.username,
+                    remote_project_id
+                );
+                let remote_project = Project::remote(
+                    remote_project_id,
+                    client.client.clone(),
+                    client.user_store.clone(),
+                    client.language_registry.clone(),
+                    FakeFs::new(cx.background()),
+                    cx.to_async(),
+                )
+                .await?;
+                client.remote_projects.push(remote_project.clone());
+                remote_project
+            };
+
+            remote_project
+        } else {
+            client
+                .remote_projects
+                .choose(&mut *rng.lock())
+                .unwrap()
+                .clone()
+        }
+    };
+
+    if active_call.read_with(cx, |call, _| call.room().is_some()) {
+        if let Err(error) = active_call
+            .update(cx, |call, cx| call.share_project(project.clone(), cx))
+            .await
+        {
+            log::error!("{}: error sharing project, {:?}", client.username, error);
+        }
+    }
+
+    let buffers = client.buffers.entry(project.clone()).or_default();
+    let buffer = if buffers.is_empty() || rng.lock().gen() {
+        let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| {
+            project
+                .worktrees(cx)
+                .filter(|worktree| {
+                    let worktree = worktree.read(cx);
+                    worktree.is_visible() && worktree.entries(false).any(|e| e.is_file())
+                })
+                .choose(&mut *rng.lock())
+        }) {
+            worktree
+        } else {
+            cx.background().simulate_random_delay().await;
+            return Ok(());
+        };
+
+        let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| {
+            let entry = worktree
+                .entries(false)
+                .filter(|e| e.is_file())
+                .choose(&mut *rng.lock())
+                .unwrap();
+            (
+                worktree.root_name().to_string(),
+                (worktree.id(), entry.path.clone()),
+            )
+        });
+        log::info!(
+            "{}: opening path {:?} in worktree {} ({})",
+            client.username,
+            project_path.1,
+            project_path.0,
+            worktree_root_name,
+        );
+        let buffer = project
+            .update(cx, |project, cx| {
+                project.open_buffer(project_path.clone(), cx)
+            })
+            .await?;
+        log::info!(
+            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
+            client.username,
+            project_path.1,
+            project_path.0,
+            worktree_root_name,
+            buffer.read_with(cx, |buffer, _| buffer.remote_id())
+        );
+        buffers.insert(buffer.clone());
+        buffer
+    } else {
+        buffers.iter().choose(&mut *rng.lock()).unwrap().clone()
+    };
+
+    let choice = rng.lock().gen_range(0..100);
+    match choice {
+        0..=9 => {
+            cx.update(|cx| {
+                log::info!(
+                    "{}: dropping buffer {:?}",
+                    client.username,
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                buffers.remove(&buffer);
+                drop(buffer);
+            });
+        }
+        10..=19 => {
+            let completions = project.update(cx, |project, cx| {
+                log::info!(
+                    "{}: requesting completions for buffer {} ({:?})",
+                    client.username,
+                    buffer.read(cx).remote_id(),
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                project.completions(&buffer, offset, cx)
+            });
+            let completions = cx.background().spawn(async move {
+                completions
+                    .await
+                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching completions request", client.username);
+                cx.update(|cx| completions.detach_and_log_err(cx));
+            } else {
+                completions.await?;
+            }
+        }
+        20..=29 => {
+            let code_actions = project.update(cx, |project, cx| {
+                log::info!(
+                    "{}: requesting code actions for buffer {} ({:?})",
+                    client.username,
+                    buffer.read(cx).remote_id(),
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
+                project.code_actions(&buffer, range, cx)
+            });
+            let code_actions = cx.background().spawn(async move {
+                code_actions
+                    .await
+                    .map_err(|err| anyhow!("code actions request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching code actions request", client.username);
+                cx.update(|cx| code_actions.detach_and_log_err(cx));
+            } else {
+                code_actions.await?;
+            }
+        }
+        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
+            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
+                log::info!(
+                    "{}: saving buffer {} ({:?})",
+                    client.username,
+                    buffer.remote_id(),
+                    buffer.file().unwrap().full_path(cx)
+                );
+                (buffer.version(), buffer.save(cx))
+            });
+            let save = cx.background().spawn(async move {
+                let (saved_version, _, _) = save
+                    .await
+                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
+                assert!(saved_version.observed_all(&requested_version));
+                Ok::<_, anyhow::Error>(())
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching save request", client.username);
+                cx.update(|cx| save.detach_and_log_err(cx));
+            } else {
+                save.await?;
+            }
+        }
+        40..=44 => {
+            let prepare_rename = project.update(cx, |project, cx| {
+                log::info!(
+                    "{}: preparing rename for buffer {} ({:?})",
+                    client.username,
+                    buffer.read(cx).remote_id(),
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                project.prepare_rename(buffer, offset, cx)
+            });
+            let prepare_rename = cx.background().spawn(async move {
+                prepare_rename
+                    .await
+                    .map_err(|err| anyhow!("prepare rename request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching prepare rename request", client.username);
+                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
+            } else {
+                prepare_rename.await?;
+            }
+        }
+        45..=49 => {
+            let definitions = project.update(cx, |project, cx| {
+                log::info!(
+                    "{}: requesting definitions for buffer {} ({:?})",
+                    client.username,
+                    buffer.read(cx).remote_id(),
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                project.definition(&buffer, offset, cx)
+            });
+            let definitions = cx.background().spawn(async move {
+                definitions
+                    .await
+                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching definitions request", client.username);
+                cx.update(|cx| definitions.detach_and_log_err(cx));
+            } else {
+                buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer));
+            }
+        }
+        50..=54 => {
+            let highlights = project.update(cx, |project, cx| {
+                log::info!(
+                    "{}: requesting highlights for buffer {} ({:?})",
+                    client.username,
+                    buffer.read(cx).remote_id(),
+                    buffer.read(cx).file().unwrap().full_path(cx)
+                );
+                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                project.document_highlights(&buffer, offset, cx)
+            });
+            let highlights = cx.background().spawn(async move {
+                highlights
+                    .await
+                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching highlights request", client.username);
+                cx.update(|cx| highlights.detach_and_log_err(cx));
+            } else {
+                highlights.await?;
+            }
+        }
+        55..=59 => {
+            let search = project.update(cx, |project, cx| {
+                let query = rng.lock().gen_range('a'..='z');
+                log::info!("{}: project-wide search {:?}", client.username, query);
+                project.search(SearchQuery::text(query, false, false), cx)
+            });
+            let search = cx.background().spawn(async move {
+                search
+                    .await
+                    .map_err(|err| anyhow!("search request failed: {:?}", err))
+            });
+            if rng.lock().gen_bool(0.3) {
+                log::info!("{}: detaching search request", client.username);
+                cx.update(|cx| search.detach_and_log_err(cx));
+            } else {
+                buffers.extend(search.await?.into_keys());
+            }
+        }
+        60..=79 => {
+            let worktree = project
+                .read_with(cx, |project, cx| {
+                    project
+                        .worktrees(cx)
+                        .filter(|worktree| {
+                            let worktree = worktree.read(cx);
+                            worktree.is_visible()
+                                && worktree.entries(false).any(|e| e.is_file())
+                                && worktree.root_entry().map_or(false, |e| e.is_dir())
+                        })
+                        .choose(&mut *rng.lock())
+                })
+                .unwrap();
+            let (worktree_id, worktree_root_name) = worktree.read_with(cx, |worktree, _| {
+                (worktree.id(), worktree.root_name().to_string())
+            });
+
+            let mut new_name = String::new();
+            for _ in 0..10 {
+                let letter = rng.lock().gen_range('a'..='z');
+                new_name.push(letter);
+            }
+
+            let is_dir = rng.lock().gen::<bool>();
+            let mut new_path = PathBuf::new();
+            new_path.push(new_name);
+            if !is_dir {
+                new_path.set_extension("rs");
+            }
+            log::info!(
+                "{}: creating {:?} in worktree {} ({})",
+                client.username,
+                new_path,
+                worktree_id,
+                worktree_root_name,
+            );
+            project
+                .update(cx, |project, cx| {
+                    project.create_entry((worktree_id, new_path), is_dir, cx)
+                })
+                .unwrap()
+                .await?;
+        }
+        _ => {
+            buffer.update(cx, |buffer, cx| {
+                log::info!(
+                    "{}: updating buffer {} ({:?})",
+                    client.username,
+                    buffer.remote_id(),
+                    buffer.file().unwrap().full_path(cx)
+                );
+                if rng.lock().gen_bool(0.7) {
+                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
+                } else {
+                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
+                }
+            });
+        }
+    }
+
+    Ok(())
+}