Merge pull request #1843 from zed-industries/call-randomized-test

Antonio Scandurra created

Model calls in randomized collaboration test

Change summary

crates/client/src/client.rs            |   11 
crates/collab/src/integration_tests.rs | 1506 ++++++++++++---------------
crates/collab/src/rpc.rs               |   24 
crates/collab/src/rpc/store.rs         |   11 
crates/fs/src/fs.rs                    |   15 
5 files changed, 715 insertions(+), 852 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -835,11 +835,14 @@ impl Client {
                             continue;
                         };
 
-                        if let Some(handler) = state.message_handlers.get(&payload_type_id).cloned()
-                        {
-                            drop(state); // Avoid deadlocks if the handler interacts with rpc::Client
-                            let future = handler(model, message, &this, cx.clone());
+                        let handler = state.message_handlers.get(&payload_type_id).cloned();
+                        // Dropping the state prevents deadlocks if the handler interacts with rpc::Client.
+                        // It also ensures we don't hold the lock while yielding back to the executor, as
+                        // that might cause the executor thread driving this future to block indefinitely.
+                        drop(state);
 
+                        if let Some(handler) = handler {
+                            let future = handler(model, message, &this, cx.clone());
                             let client_id = this.id;
                             log::debug!(
                                 "rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}",

crates/collab/src/integration_tests.rs 🔗

@@ -28,7 +28,7 @@ use gpui::{
 };
 use language::{
     range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
-    LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
+    LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, PointUtf16, Rope,
 };
 use live_kit_client::MacOSDisplay;
 use lsp::{self, FakeLanguageServer};
@@ -54,6 +54,7 @@ use std::{
 };
 use theme::ThemeRegistry;
 use unindent::Unindent as _;
+use util::post_inc;
 use workspace::{shared_screen::SharedScreen, Item, SplitDirection, ToggleFollow, Workspace};
 
 #[ctor::ctor]
@@ -5778,57 +5779,28 @@ async fn test_random_collaboration(
     rng: StdRng,
 ) {
     deterministic.forbid_parking();
+    let rng = Arc::new(Mutex::new(rng));
+
     let max_peers = env::var("MAX_PEERS")
         .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
         .unwrap_or(5);
-    assert!(max_peers <= 5);
 
     let max_operations = env::var("OPERATIONS")
         .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
         .unwrap_or(10);
 
-    let rng = Arc::new(Mutex::new(rng));
-
-    let guest_lang_registry = Arc::new(LanguageRegistry::test());
-    let host_language_registry = Arc::new(LanguageRegistry::test());
-
-    let fs = FakeFs::new(cx.background());
-    fs.insert_tree("/_collab", json!({"init": ""})).await;
-
     let mut server = TestServer::start(cx.foreground(), cx.background()).await;
     let db = server.app_state.db.clone();
 
-    let room_creator_user_id = db
-        .create_user(
-            "room-creator@example.com",
-            false,
-            NewUserParams {
-                github_login: "room-creator".into(),
-                github_user_id: 0,
-                invite_count: 0,
-            },
-        )
-        .await
-        .unwrap()
-        .user_id;
-    let mut available_guests = vec![
-        "guest-1".to_string(),
-        "guest-2".to_string(),
-        "guest-3".to_string(),
-        "guest-4".to_string(),
-    ];
-
-    for (ix, username) in Some(&"host".to_string())
-        .into_iter()
-        .chain(&available_guests)
-        .enumerate()
-    {
+    let mut available_guests = Vec::new();
+    for ix in 0..max_peers {
+        let username = format!("guest-{}", ix + 1);
         let user_id = db
             .create_user(
                 &format!("{username}@example.com"),
                 false,
                 NewUserParams {
-                    github_login: username.into(),
+                    github_login: username.clone(),
                     github_user_id: (ix + 1) as i32,
                     invite_count: 0,
                 },
@@ -5836,270 +5808,39 @@ async fn test_random_collaboration(
             .await
             .unwrap()
             .user_id;
-        server
-            .app_state
-            .db
-            .send_contact_request(user_id, room_creator_user_id)
-            .await
-            .unwrap();
-        server
-            .app_state
-            .db
-            .respond_to_contact_request(room_creator_user_id, user_id, true)
-            .await
-            .unwrap();
+        available_guests.push((user_id, username));
     }
 
-    let _room_creator = server.create_client(cx, "room-creator").await;
-    let active_call = cx.read(ActiveCall::global);
+    for (ix, (user_id_a, _)) in available_guests.iter().enumerate() {
+        for (user_id_b, _) in &available_guests[ix + 1..] {
+            server
+                .app_state
+                .db
+                .send_contact_request(*user_id_a, *user_id_b)
+                .await
+                .unwrap();
+            server
+                .app_state
+                .db
+                .respond_to_contact_request(*user_id_b, *user_id_a, true)
+                .await
+                .unwrap();
+        }
+    }
 
     let mut clients = Vec::new();
     let mut user_ids = Vec::new();
     let mut peer_ids = Vec::new();
     let mut op_start_signals = Vec::new();
-
     let mut next_entity_id = 100000;
-    let mut host_cx = TestAppContext::new(
-        cx.foreground_platform(),
-        cx.platform(),
-        deterministic.build_foreground(next_entity_id),
-        deterministic.build_background(),
-        cx.font_cache(),
-        cx.leak_detector(),
-        next_entity_id,
-        cx.function_name.clone(),
-    );
-    let host = server.create_client(&mut host_cx, "host").await;
-    let host_project = host_cx.update(|cx| {
-        Project::local(
-            host.client.clone(),
-            host.user_store.clone(),
-            host.project_store.clone(),
-            host_language_registry.clone(),
-            fs.clone(),
-            cx,
-        )
-    });
-
-    let (collab_worktree, _) = host_project
-        .update(&mut host_cx, |project, cx| {
-            project.find_or_create_local_worktree("/_collab", true, cx)
-        })
-        .await
-        .unwrap();
-    collab_worktree
-        .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
-        .await;
-
-    // Set up fake language servers.
-    let mut language = Language::new(
-        LanguageConfig {
-            name: "Rust".into(),
-            path_suffixes: vec!["rs".to_string()],
-            ..Default::default()
-        },
-        None,
-    );
-    let _fake_servers = language
-        .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
-            name: "the-fake-language-server",
-            capabilities: lsp::LanguageServer::full_capabilities(),
-            initializer: Some(Box::new({
-                let rng = rng.clone();
-                let fs = fs.clone();
-                let project = host_project.downgrade();
-                move |fake_server: &mut FakeLanguageServer| {
-                    fake_server.handle_request::<lsp::request::Completion, _, _>(
-                        |_, _| async move {
-                            Ok(Some(lsp::CompletionResponse::Array(vec![
-                                lsp::CompletionItem {
-                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
-                                        range: lsp::Range::new(
-                                            lsp::Position::new(0, 0),
-                                            lsp::Position::new(0, 0),
-                                        ),
-                                        new_text: "the-new-text".to_string(),
-                                    })),
-                                    ..Default::default()
-                                },
-                            ])))
-                        },
-                    );
-
-                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
-                        |_, _| async move {
-                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
-                                lsp::CodeAction {
-                                    title: "the-code-action".to_string(),
-                                    ..Default::default()
-                                },
-                            )]))
-                        },
-                    );
-
-                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
-                        |params, _| async move {
-                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
-                                params.position,
-                                params.position,
-                            ))))
-                        },
-                    );
-
-                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
-                        let fs = fs.clone();
-                        let rng = rng.clone();
-                        move |_, _| {
-                            let fs = fs.clone();
-                            let rng = rng.clone();
-                            async move {
-                                let files = fs.files().await;
-                                let mut rng = rng.lock();
-                                let count = rng.gen_range::<usize, _>(1..3);
-                                let files = (0..count)
-                                    .map(|_| files.choose(&mut *rng).unwrap())
-                                    .collect::<Vec<_>>();
-                                log::info!("LSP: Returning definitions in files {:?}", &files);
-                                Ok(Some(lsp::GotoDefinitionResponse::Array(
-                                    files
-                                        .into_iter()
-                                        .map(|file| lsp::Location {
-                                            uri: lsp::Url::from_file_path(file).unwrap(),
-                                            range: Default::default(),
-                                        })
-                                        .collect(),
-                                )))
-                            }
-                        }
-                    });
-
-                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
-                        let rng = rng.clone();
-                        let project = project;
-                        move |params, mut cx| {
-                            let highlights = if let Some(project) = project.upgrade(&cx) {
-                                project.update(&mut cx, |project, cx| {
-                                    let path = params
-                                        .text_document_position_params
-                                        .text_document
-                                        .uri
-                                        .to_file_path()
-                                        .unwrap();
-                                    let (worktree, relative_path) =
-                                        project.find_local_worktree(&path, cx)?;
-                                    let project_path =
-                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
-                                    let buffer =
-                                        project.get_open_buffer(&project_path, cx)?.read(cx);
-
-                                    let mut highlights = Vec::new();
-                                    let highlight_count = rng.lock().gen_range(1..=5);
-                                    let mut prev_end = 0;
-                                    for _ in 0..highlight_count {
-                                        let range =
-                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
-
-                                        highlights.push(lsp::DocumentHighlight {
-                                            range: range_to_lsp(range.to_point_utf16(buffer)),
-                                            kind: Some(lsp::DocumentHighlightKind::READ),
-                                        });
-                                        prev_end = range.end;
-                                    }
-                                    Some(highlights)
-                                })
-                            } else {
-                                None
-                            };
-                            async move { Ok(highlights) }
-                        }
-                    });
-                }
-            })),
-            ..Default::default()
-        }))
-        .await;
-    host_language_registry.add(Arc::new(language));
-
-    let host_user_id = host.current_user_id(&host_cx);
-    active_call
-        .update(cx, |call, cx| {
-            call.invite(host_user_id.to_proto(), None, cx)
-        })
-        .await
-        .unwrap();
-    active_call.read_with(cx, |call, cx| call.room().unwrap().read(cx).id());
-    deterministic.run_until_parked();
-    let host_active_call = host_cx.read(ActiveCall::global);
-    host_active_call
-        .update(&mut host_cx, |call, cx| call.accept_incoming(cx))
-        .await
-        .unwrap();
-
-    let host_project_id = host_active_call
-        .update(&mut host_cx, |call, cx| {
-            call.share_project(host_project.clone(), cx)
-        })
-        .await
-        .unwrap();
-
-    let op_start_signal = futures::channel::mpsc::unbounded();
-    user_ids.push(host_user_id);
-    peer_ids.push(host.peer_id().unwrap());
-    op_start_signals.push(op_start_signal.0);
-    clients.push(host_cx.foreground().spawn(host.simulate_host(
-        host_project,
-        op_start_signal.1,
-        rng.clone(),
-        host_cx,
-    )));
-
-    let disconnect_host_at = if rng.lock().gen_bool(0.2) {
-        rng.lock().gen_range(0..max_operations)
-    } else {
-        max_operations
-    };
 
     let mut operations = 0;
     while operations < max_operations {
-        if operations == disconnect_host_at {
-            server.disconnect_client(peer_ids[0]);
-            deterministic.advance_clock(RECEIVE_TIMEOUT);
-            drop(op_start_signals);
-
-            deterministic.start_waiting();
-            let mut clients = futures::future::join_all(clients).await;
-            deterministic.finish_waiting();
-            deterministic.run_until_parked();
-
-            let (host, host_project, mut host_cx, host_err) = clients.remove(0);
-            if let Some(host_err) = host_err {
-                log::error!("host error - {:?}", host_err);
-            }
-            host_project.read_with(&host_cx, |project, _| assert!(!project.is_shared()));
-            for (guest, guest_project, mut guest_cx, guest_err) in clients {
-                if let Some(guest_err) = guest_err {
-                    log::error!("{} error - {:?}", guest.username, guest_err);
-                }
-
-                guest_project.read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
-                guest_cx.update(|cx| {
-                    cx.clear_globals();
-                    drop((guest, guest_project));
-                });
-            }
-            host_cx.update(|cx| {
-                cx.clear_globals();
-                drop((host, host_project));
-            });
-
-            return;
-        }
-
         let distribution = rng.lock().gen_range(0..100);
         match distribution {
             0..=19 if !available_guests.is_empty() => {
                 let guest_ix = rng.lock().gen_range(0..available_guests.len());
-                let guest_username = available_guests.remove(guest_ix);
+                let (_, guest_username) = available_guests.remove(guest_ix);
                 log::info!("Adding new connection for {}", guest_username);
                 next_entity_id += 100000;
                 let mut guest_cx = TestAppContext::new(
@@ -6113,43 +5854,13 @@ async fn test_random_collaboration(
                     cx.function_name.clone(),
                 );
 
-                deterministic.start_waiting();
-                let guest = server.create_client(&mut guest_cx, &guest_username).await;
-                let guest_user_id = guest.current_user_id(&guest_cx);
-
-                active_call
-                    .update(cx, |call, cx| {
-                        call.invite(guest_user_id.to_proto(), None, cx)
-                    })
-                    .await
-                    .unwrap();
-                deterministic.run_until_parked();
-                guest_cx
-                    .read(ActiveCall::global)
-                    .update(&mut guest_cx, |call, cx| call.accept_incoming(cx))
-                    .await
-                    .unwrap();
-
-                let guest_project = Project::remote(
-                    host_project_id,
-                    guest.client.clone(),
-                    guest.user_store.clone(),
-                    guest.project_store.clone(),
-                    guest_lang_registry.clone(),
-                    FakeFs::new(cx.background()),
-                    guest_cx.to_async(),
-                )
-                .await
-                .unwrap();
-                deterministic.finish_waiting();
-
                 let op_start_signal = futures::channel::mpsc::unbounded();
-                user_ids.push(guest_user_id);
+                let guest = server.create_client(&mut guest_cx, &guest_username).await;
+                user_ids.push(guest.current_user_id(&guest_cx));
                 peer_ids.push(guest.peer_id().unwrap());
                 op_start_signals.push(op_start_signal.0);
-                clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
+                clients.push(guest_cx.foreground().spawn(guest.simulate(
                     guest_username.clone(),
-                    guest_project,
                     op_start_signal.1,
                     rng.clone(),
                     guest_cx,
@@ -6170,14 +5881,13 @@ async fn test_random_collaboration(
                 deterministic.advance_clock(RECEIVE_TIMEOUT);
                 deterministic.start_waiting();
                 log::info!("Waiting for guest {} to exit...", removed_guest_id);
-                let (guest, guest_project, mut guest_cx, guest_err) = guest.await;
+                let (guest, mut guest_cx) = guest.await;
                 deterministic.finish_waiting();
                 server.allow_connections();
 
-                if let Some(guest_err) = guest_err {
-                    log::error!("{} error - {:?}", guest.username, guest_err);
+                for project in &guest.remote_projects {
+                    project.read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
                 }
-                guest_project.read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
                 for user_id in &user_ids {
                     let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
                     let contacts = server
@@ -6197,15 +5907,15 @@ async fn test_random_collaboration(
                 }
 
                 log::info!("{} removed", guest.username);
-                available_guests.push(guest.username.clone());
+                available_guests.push((removed_guest_id, guest.username.clone()));
                 guest_cx.update(|cx| {
                     cx.clear_globals();
-                    drop((guest, guest_project));
+                    drop(guest);
                 });
 
                 operations += 1;
             }
-            _ => {
+            _ if !op_start_signals.is_empty() => {
                 while operations < max_operations && rng.lock().gen_bool(0.7) {
                     op_start_signals
                         .choose(&mut *rng.lock())
@@ -6219,115 +5929,147 @@ async fn test_random_collaboration(
                     deterministic.run_until_parked();
                 }
             }
+            _ => {}
         }
     }
 
     drop(op_start_signals);
     deterministic.start_waiting();
-    let mut clients = futures::future::join_all(clients).await;
+    let clients = futures::future::join_all(clients).await;
     deterministic.finish_waiting();
     deterministic.run_until_parked();
 
-    let (host_client, host_project, mut host_cx, host_err) = clients.remove(0);
-    if let Some(host_err) = host_err {
-        panic!("host error - {:?}", host_err);
-    }
-    let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
-        project
-            .worktrees(cx)
-            .map(|worktree| {
-                let snapshot = worktree.read(cx).snapshot();
-                (snapshot.id(), snapshot)
-            })
-            .collect::<BTreeMap<_, _>>()
-    });
+    for (guest_client, guest_cx) in &clients {
+        for guest_project in &guest_client.remote_projects {
+            guest_project.read_with(guest_cx, |guest_project, cx| {
+                let host_project = clients.iter().find_map(|(client, cx)| {
+                    let project = client.local_projects.iter().find(|host_project| {
+                        host_project.read_with(cx, |host_project, _| {
+                            host_project.remote_id() == guest_project.remote_id()
+                        })
+                    })?;
+                    Some((project, cx))
+                });
 
-    host_project.read_with(&host_cx, |project, cx| project.check_invariants(cx));
+                if !guest_project.is_read_only() {
+                    if let Some((host_project, host_cx)) = host_project {
+                        let host_worktree_snapshots =
+                            host_project.read_with(host_cx, |host_project, cx| {
+                                host_project
+                                    .worktrees(cx)
+                                    .map(|worktree| {
+                                        let worktree = worktree.read(cx);
+                                        (worktree.id(), worktree.snapshot())
+                                    })
+                                    .collect::<BTreeMap<_, _>>()
+                            });
+                        let guest_worktree_snapshots = guest_project
+                            .worktrees(cx)
+                            .map(|worktree| {
+                                let worktree = worktree.read(cx);
+                                (worktree.id(), worktree.snapshot())
+                            })
+                            .collect::<BTreeMap<_, _>>();
 
-    for (guest_client, guest_project, mut guest_cx, guest_err) in clients.into_iter() {
-        if let Some(guest_err) = guest_err {
-            panic!("{} error - {:?}", guest_client.username, guest_err);
-        }
-        let worktree_snapshots = guest_project.read_with(&guest_cx, |project, cx| {
-            project
-                .worktrees(cx)
-                .map(|worktree| {
-                    let worktree = worktree.read(cx);
-                    (worktree.id(), worktree.snapshot())
-                })
-                .collect::<BTreeMap<_, _>>()
-        });
+                        assert_eq!(
+                            guest_worktree_snapshots.keys().collect::<Vec<_>>(),
+                            host_worktree_snapshots.keys().collect::<Vec<_>>(),
+                            "{} has different worktrees than the host",
+                            guest_client.username
+                        );
 
-        assert_eq!(
-            worktree_snapshots.keys().collect::<Vec<_>>(),
-            host_worktree_snapshots.keys().collect::<Vec<_>>(),
-            "{} has different worktrees than the host",
-            guest_client.username
-        );
-        for (id, host_snapshot) in &host_worktree_snapshots {
-            let guest_snapshot = &worktree_snapshots[id];
-            assert_eq!(
-                guest_snapshot.root_name(),
-                host_snapshot.root_name(),
-                "{} has different root name than the host for worktree {}",
-                guest_client.username,
-                id
-            );
-            assert_eq!(
-                guest_snapshot.entries(false).collect::<Vec<_>>(),
-                host_snapshot.entries(false).collect::<Vec<_>>(),
-                "{} has different snapshot than the host for worktree {}",
-                guest_client.username,
-                id
-            );
-            assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
+                        for (id, host_snapshot) in &host_worktree_snapshots {
+                            let guest_snapshot = &guest_worktree_snapshots[id];
+                            assert_eq!(
+                                guest_snapshot.root_name(),
+                                host_snapshot.root_name(),
+                                "{} has different root name than the host for worktree {}",
+                                guest_client.username,
+                                id
+                            );
+                            assert_eq!(
+                                guest_snapshot.entries(false).collect::<Vec<_>>(),
+                                host_snapshot.entries(false).collect::<Vec<_>>(),
+                                "{} has different snapshot than the host for worktree {}",
+                                guest_client.username,
+                                id
+                            );
+                            assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
+                        }
+                    }
+                }
+
+                guest_project.check_invariants(cx);
+            });
         }
 
-        guest_project.read_with(&guest_cx, |project, cx| project.check_invariants(cx));
-
-        for guest_buffer in &guest_client.buffers {
-            let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
-            let host_buffer = host_project.read_with(&host_cx, |project, cx| {
-                project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
-                    panic!(
-                        "host does not have buffer for guest:{}, peer:{:?}, id:{}",
-                        guest_client.username,
-                        guest_client.peer_id(),
-                        buffer_id
-                    )
-                })
+        for (guest_project, guest_buffers) in &guest_client.buffers {
+            let project_id = if guest_project.read_with(guest_cx, |project, _| {
+                project.is_local() || project.is_read_only()
+            }) {
+                continue;
+            } else {
+                guest_project
+                    .read_with(guest_cx, |project, _| project.remote_id())
+                    .unwrap()
+            };
+
+            let host_project = clients.iter().find_map(|(client, cx)| {
+                let project = client.local_projects.iter().find(|host_project| {
+                    host_project.read_with(cx, |host_project, _| {
+                        host_project.remote_id() == Some(project_id)
+                    })
+                })?;
+                Some((project, cx))
             });
-            let path =
-                host_buffer.read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
 
-            assert_eq!(
-                guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
-                0,
-                "{}, buffer {}, path {:?} has deferred operations",
-                guest_client.username,
-                buffer_id,
-                path,
-            );
-            assert_eq!(
-                guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
-                host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
-                "{}, buffer {}, path {:?}, differs from the host's buffer",
-                guest_client.username,
-                buffer_id,
-                path
-            );
+            let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project {
+                (host_project, host_cx)
+            } else {
+                continue;
+            };
+
+            for guest_buffer in guest_buffers {
+                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
+                let host_buffer = host_project.read_with(host_cx, |project, cx| {
+                    project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
+                        panic!(
+                            "host does not have buffer for guest:{}, peer:{:?}, id:{}",
+                            guest_client.username,
+                            guest_client.peer_id(),
+                            buffer_id
+                        )
+                    })
+                });
+                let path = host_buffer
+                    .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
+
+                assert_eq!(
+                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()),
+                    0,
+                    "{}, buffer {}, path {:?} has deferred operations",
+                    guest_client.username,
+                    buffer_id,
+                    path,
+                );
+                assert_eq!(
+                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
+                    host_buffer.read_with(host_cx, |buffer, _| buffer.text()),
+                    "{}, buffer {}, path {:?}, differs from the host's buffer",
+                    guest_client.username,
+                    buffer_id,
+                    path
+                );
+            }
         }
+    }
 
-        guest_cx.update(|cx| {
+    for (client, mut cx) in clients {
+        cx.update(|cx| {
             cx.clear_globals();
-            drop((guest_project, guest_client));
+            drop(client);
         });
     }
-
-    host_cx.update(|cx| {
-        cx.clear_globals();
-        drop((host_client, host_project))
-    });
 }
 
 struct TestServer {
@@ -6494,6 +6236,9 @@ impl TestServer {
         let client = TestClient {
             client,
             username: name.to_string(),
+            local_projects: Default::default(),
+            remote_projects: Default::default(),
+            next_root_dir_id: 0,
             user_store,
             project_store,
             fs,
@@ -6612,11 +6357,14 @@ impl Drop for TestServer {
 struct TestClient {
     client: Arc<Client>,
     username: String,
+    local_projects: Vec<ModelHandle<Project>>,
+    remote_projects: Vec<ModelHandle<Project>>,
+    next_root_dir_id: usize,
     pub user_store: ModelHandle<UserStore>,
     pub project_store: ModelHandle<ProjectStore>,
     language_registry: Arc<LanguageRegistry>,
     fs: Arc<FakeFs>,
-    buffers: HashSet<ModelHandle<language::Buffer>>,
+    buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
 }
 
 impl Deref for TestClient {
@@ -6731,470 +6479,558 @@ impl TestClient {
         })
     }
 
-    async fn simulate_host(
+    pub async fn simulate(
         mut self,
-        project: ModelHandle<Project>,
-        op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
+        username: String,
+        mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
         rng: Arc<Mutex<StdRng>>,
         mut cx: TestAppContext,
-    ) -> (
-        Self,
-        ModelHandle<Project>,
-        TestAppContext,
-        Option<anyhow::Error>,
-    ) {
-        async fn simulate_host_internal(
+    ) -> (Self, TestAppContext) {
+        async fn tick(
             client: &mut TestClient,
-            project: ModelHandle<Project>,
-            mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
+            username: &str,
             rng: Arc<Mutex<StdRng>>,
             cx: &mut TestAppContext,
         ) -> anyhow::Result<()> {
-            let fs = project.read_with(cx, |project, _| project.fs().clone());
+            let active_call = cx.read(ActiveCall::global);
+            if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
+                if rng.lock().gen() {
+                    log::info!("{}: accepting incoming call", username);
+                    active_call
+                        .update(cx, |call, cx| call.accept_incoming(cx))
+                        .await?;
+                } else {
+                    log::info!("{}: declining incoming call", username);
+                    active_call.update(cx, |call, _| call.decline_incoming())?;
+                }
+            } else {
+                let available_contacts = client.user_store.read_with(cx, |user_store, _| {
+                    user_store
+                        .contacts()
+                        .iter()
+                        .filter(|contact| contact.online && !contact.busy)
+                        .cloned()
+                        .collect::<Vec<_>>()
+                });
 
-            while op_start_signal.next().await.is_some() {
-                let distribution = rng.lock().gen_range::<usize, _>(0..100);
-                let files = fs.as_fake().files().await;
+                let distribution = rng.lock().gen_range(0..100);
                 match distribution {
-                    0..=19 if !files.is_empty() => {
-                        let path = files.choose(&mut *rng.lock()).unwrap();
-                        let mut path = path.as_path();
-                        while let Some(parent_path) = path.parent() {
-                            path = parent_path;
-                            if rng.lock().gen() {
-                                break;
-                            }
-                        }
-
-                        log::info!("Host: find/create local worktree {:?}", path);
-                        let find_or_create_worktree = project.update(cx, |project, cx| {
-                            project.find_or_create_local_worktree(path, true, cx)
-                        });
-                        if rng.lock().gen() {
-                            cx.background().spawn(find_or_create_worktree).detach();
-                        } else {
-                            find_or_create_worktree.await?;
-                        }
+                    0..=29 if !available_contacts.is_empty() => {
+                        let contact = available_contacts.choose(&mut *rng.lock()).unwrap();
+                        log::info!("{}: inviting {}", username, contact.user.github_login);
+                        active_call
+                            .update(cx, |call, cx| call.invite(contact.user.id, None, cx))
+                            .await?;
                     }
-                    20..=79 if !files.is_empty() => {
-                        let buffer = if client.buffers.is_empty() || rng.lock().gen() {
-                            let file = files.choose(&mut *rng.lock()).unwrap();
-                            let (worktree, path) = project
-                                .update(cx, |project, cx| {
-                                    project.find_or_create_local_worktree(file.clone(), true, cx)
-                                })
-                                .await?;
-                            let project_path =
-                                worktree.read_with(cx, |worktree, _| (worktree.id(), path));
-                            log::info!(
-                                "Host: opening path {:?}, worktree {}, relative_path {:?}",
-                                file,
-                                project_path.0,
-                                project_path.1
-                            );
-                            let buffer = project
-                                .update(cx, |project, cx| project.open_buffer(project_path, cx))
-                                .await
-                                .unwrap();
-                            client.buffers.insert(buffer.clone());
-                            buffer
-                        } else {
-                            client
-                                .buffers
-                                .iter()
-                                .choose(&mut *rng.lock())
-                                .unwrap()
-                                .clone()
-                        };
-
-                        if rng.lock().gen_bool(0.1) {
-                            cx.update(|cx| {
-                                log::info!(
-                                    "Host: dropping buffer {:?}",
-                                    buffer.read(cx).file().unwrap().full_path(cx)
-                                );
-                                client.buffers.remove(&buffer);
-                                drop(buffer);
-                            });
-                        } else {
-                            buffer.update(cx, |buffer, cx| {
-                                log::info!(
-                                    "Host: updating buffer {:?} ({})",
-                                    buffer.file().unwrap().full_path(cx),
-                                    buffer.remote_id()
-                                );
-
-                                if rng.lock().gen_bool(0.7) {
-                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
-                                } else {
-                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
-                                }
-                            });
-                        }
+                    30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => {
+                        log::info!("{}: hanging up", username);
+                        active_call.update(cx, |call, cx| call.hang_up(cx))?;
                     }
-                    _ => loop {
-                        let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
-                        let mut path = PathBuf::new();
-                        path.push("/");
-                        for _ in 0..path_component_count {
-                            let letter = rng.lock().gen_range(b'a'..=b'z');
-                            path.push(std::str::from_utf8(&[letter]).unwrap());
-                        }
-                        path.set_extension("rs");
-                        let parent_path = path.parent().unwrap();
-
-                        log::info!("Host: creating file {:?}", path,);
-
-                        if fs.create_dir(parent_path).await.is_ok()
-                            && fs.create_file(&path, Default::default()).await.is_ok()
-                        {
-                            break;
-                        } else {
-                            log::info!("Host: cannot create file");
-                        }
-                    },
+                    _ => {}
                 }
-
-                cx.background().simulate_random_delay().await;
             }
 
-            Ok(())
-        }
-
-        let result =
-            simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx).await;
-        log::info!("Host done");
-        (self, project, cx, result.err())
-    }
-
-    pub async fn simulate_guest(
-        mut self,
-        guest_username: String,
-        project: ModelHandle<Project>,
-        op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
-        rng: Arc<Mutex<StdRng>>,
-        mut cx: TestAppContext,
-    ) -> (
-        Self,
-        ModelHandle<Project>,
-        TestAppContext,
-        Option<anyhow::Error>,
-    ) {
-        async fn simulate_guest_internal(
-            client: &mut TestClient,
-            guest_username: &str,
-            project: ModelHandle<Project>,
-            mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
-            rng: Arc<Mutex<StdRng>>,
-            cx: &mut TestAppContext,
-        ) -> anyhow::Result<()> {
-            while op_start_signal.next().await.is_some() {
-                let buffer = if client.buffers.is_empty() || rng.lock().gen() {
-                    let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| {
-                        project
-                            .worktrees(cx)
-                            .filter(|worktree| {
-                                let worktree = worktree.read(cx);
-                                worktree.is_visible()
-                                    && worktree.entries(false).any(|e| e.is_file())
+            let remote_projects =
+                if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) {
+                    room.read_with(cx, |room, _| {
+                        room.remote_participants()
+                            .values()
+                            .flat_map(|participant| participant.projects.clone())
+                            .collect::<Vec<_>>()
+                    })
+                } else {
+                    Default::default()
+                };
+            let project = if remote_projects.is_empty() || rng.lock().gen() {
+                if client.local_projects.is_empty() || rng.lock().gen() {
+                    let dir_paths = client.fs.directories().await;
+                    let local_project = if dir_paths.is_empty() || rng.lock().gen() {
+                        let root_path = format!(
+                            "/{}-root-{}",
+                            username,
+                            post_inc(&mut client.next_root_dir_id)
+                        );
+                        let root_path = Path::new(&root_path);
+                        client.fs.create_dir(root_path).await.unwrap();
+                        client
+                            .fs
+                            .create_file(&root_path.join("main.rs"), Default::default())
+                            .await
+                            .unwrap();
+                        log::info!("{}: opening local project at {:?}", username, root_path);
+                        client.build_local_project(root_path, cx).await.0
+                    } else {
+                        let root_path = dir_paths.choose(&mut *rng.lock()).unwrap();
+                        log::info!("{}: opening local project at {:?}", username, root_path);
+                        client.build_local_project(root_path, cx).await.0
+                    };
+                    client.local_projects.push(local_project.clone());
+                    local_project
+                } else {
+                    client
+                        .local_projects
+                        .choose(&mut *rng.lock())
+                        .unwrap()
+                        .clone()
+                }
+            } else {
+                if client.remote_projects.is_empty() || rng.lock().gen() {
+                    let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id;
+                    let remote_project = if let Some(project) =
+                        client.remote_projects.iter().find(|project| {
+                            project.read_with(cx, |project, _| {
+                                project.remote_id() == Some(remote_project_id)
                             })
-                            .choose(&mut *rng.lock())
-                    }) {
-                        worktree
+                        }) {
+                        project.clone()
                     } else {
-                        cx.background().simulate_random_delay().await;
-                        continue;
+                        log::info!("{}: opening remote project {}", username, remote_project_id);
+                        let remote_project = Project::remote(
+                            remote_project_id,
+                            client.client.clone(),
+                            client.user_store.clone(),
+                            client.project_store.clone(),
+                            client.language_registry.clone(),
+                            FakeFs::new(cx.background()),
+                            cx.to_async(),
+                        )
+                        .await?;
+                        client.remote_projects.push(remote_project.clone());
+                        remote_project
                     };
 
-                    let (worktree_root_name, project_path) =
-                        worktree.read_with(cx, |worktree, _| {
-                            let entry = worktree
-                                .entries(false)
-                                .filter(|e| e.is_file())
-                                .choose(&mut *rng.lock())
-                                .unwrap();
-                            (
-                                worktree.root_name().to_string(),
-                                (worktree.id(), entry.path.clone()),
-                            )
-                        });
-                    log::info!(
-                        "{}: opening path {:?} in worktree {} ({})",
-                        guest_username,
-                        project_path.1,
-                        project_path.0,
-                        worktree_root_name,
-                    );
-                    let buffer = project
-                        .update(cx, |project, cx| {
-                            project.open_buffer(project_path.clone(), cx)
-                        })
-                        .await?;
-                    log::info!(
-                        "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
-                        guest_username,
-                        project_path.1,
-                        project_path.0,
-                        worktree_root_name,
-                        buffer.read_with(cx, |buffer, _| buffer.remote_id())
-                    );
-                    client.buffers.insert(buffer.clone());
-                    buffer
+                    remote_project
                 } else {
                     client
-                        .buffers
-                        .iter()
+                        .remote_projects
                         .choose(&mut *rng.lock())
                         .unwrap()
                         .clone()
+                }
+            };
+            if let Err(error) = active_call
+                .update(cx, |call, cx| call.share_project(project.clone(), cx))
+                .await
+            {
+                log::error!("{}: error sharing project, {:?}", username, error);
+            }
+
+            let buffers = client.buffers.entry(project.clone()).or_default();
+            let buffer = if buffers.is_empty() || rng.lock().gen() {
+                let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| {
+                    project
+                        .worktrees(cx)
+                        .filter(|worktree| {
+                            let worktree = worktree.read(cx);
+                            worktree.is_visible() && worktree.entries(false).any(|e| e.is_file())
+                        })
+                        .choose(&mut *rng.lock())
+                }) {
+                    worktree
+                } else {
+                    cx.background().simulate_random_delay().await;
+                    return Ok(());
                 };
 
-                let choice = rng.lock().gen_range(0..100);
-                match choice {
-                    0..=9 => {
-                        cx.update(|cx| {
-                            log::info!(
-                                "{}: dropping buffer {:?}",
-                                guest_username,
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            client.buffers.remove(&buffer);
-                            drop(buffer);
-                        });
+                let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| {
+                    let entry = worktree
+                        .entries(false)
+                        .filter(|e| e.is_file())
+                        .choose(&mut *rng.lock())
+                        .unwrap();
+                    (
+                        worktree.root_name().to_string(),
+                        (worktree.id(), entry.path.clone()),
+                    )
+                });
+                log::info!(
+                    "{}: opening path {:?} in worktree {} ({})",
+                    username,
+                    project_path.1,
+                    project_path.0,
+                    worktree_root_name,
+                );
+                let buffer = project
+                    .update(cx, |project, cx| {
+                        project.open_buffer(project_path.clone(), cx)
+                    })
+                    .await?;
+                log::info!(
+                    "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
+                    username,
+                    project_path.1,
+                    project_path.0,
+                    worktree_root_name,
+                    buffer.read_with(cx, |buffer, _| buffer.remote_id())
+                );
+                buffers.insert(buffer.clone());
+                buffer
+            } else {
+                buffers.iter().choose(&mut *rng.lock()).unwrap().clone()
+            };
+
+            let choice = rng.lock().gen_range(0..100);
+            match choice {
+                0..=9 => {
+                    cx.update(|cx| {
+                        log::info!(
+                            "{}: dropping buffer {:?}",
+                            username,
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        buffers.remove(&buffer);
+                        drop(buffer);
+                    });
+                }
+                10..=19 => {
+                    let completions = project.update(cx, |project, cx| {
+                        log::info!(
+                            "{}: requesting completions for buffer {} ({:?})",
+                            username,
+                            buffer.read(cx).remote_id(),
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                        project.completions(&buffer, offset, cx)
+                    });
+                    let completions = cx.background().spawn(async move {
+                        completions
+                            .await
+                            .map_err(|err| anyhow!("completions request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching completions request", username);
+                        cx.update(|cx| completions.detach_and_log_err(cx));
+                    } else {
+                        completions.await?;
                     }
-                    10..=19 => {
-                        let completions = project.update(cx, |project, cx| {
-                            log::info!(
-                                "{}: requesting completions for buffer {} ({:?})",
-                                guest_username,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.completions(&buffer, offset, cx)
-                        });
-                        let completions = cx.background().spawn(async move {
-                            completions
-                                .await
-                                .map_err(|err| anyhow!("completions request failed: {:?}", err))
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching completions request", guest_username);
-                            cx.update(|cx| completions.detach_and_log_err(cx));
-                        } else {
-                            completions.await?;
-                        }
+                }
+                20..=29 => {
+                    let code_actions = project.update(cx, |project, cx| {
+                        log::info!(
+                            "{}: requesting code actions for buffer {} ({:?})",
+                            username,
+                            buffer.read(cx).remote_id(),
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
+                        project.code_actions(&buffer, range, cx)
+                    });
+                    let code_actions = cx.background().spawn(async move {
+                        code_actions
+                            .await
+                            .map_err(|err| anyhow!("code actions request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching code actions request", username);
+                        cx.update(|cx| code_actions.detach_and_log_err(cx));
+                    } else {
+                        code_actions.await?;
                     }
-                    20..=29 => {
-                        let code_actions = project.update(cx, |project, cx| {
-                            log::info!(
-                                "{}: requesting code actions for buffer {} ({:?})",
-                                guest_username,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
-                            project.code_actions(&buffer, range, cx)
-                        });
-                        let code_actions = cx.background().spawn(async move {
-                            code_actions
-                                .await
-                                .map_err(|err| anyhow!("code actions request failed: {:?}", err))
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching code actions request", guest_username);
-                            cx.update(|cx| code_actions.detach_and_log_err(cx));
-                        } else {
-                            code_actions.await?;
-                        }
+                }
+                30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
+                    let (requested_version, save) = buffer.update(cx, |buffer, cx| {
+                        log::info!(
+                            "{}: saving buffer {} ({:?})",
+                            username,
+                            buffer.remote_id(),
+                            buffer.file().unwrap().full_path(cx)
+                        );
+                        (buffer.version(), buffer.save(cx))
+                    });
+                    let save = cx.background().spawn(async move {
+                        let (saved_version, _, _) = save
+                            .await
+                            .map_err(|err| anyhow!("save request failed: {:?}", err))?;
+                        assert!(saved_version.observed_all(&requested_version));
+                        Ok::<_, anyhow::Error>(())
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching save request", username);
+                        cx.update(|cx| save.detach_and_log_err(cx));
+                    } else {
+                        save.await?;
                     }
-                    30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
-                        let (requested_version, save) = buffer.update(cx, |buffer, cx| {
-                            log::info!(
-                                "{}: saving buffer {} ({:?})",
-                                guest_username,
-                                buffer.remote_id(),
-                                buffer.file().unwrap().full_path(cx)
-                            );
-                            (buffer.version(), buffer.save(cx))
-                        });
-                        let save = cx.background().spawn(async move {
-                            let (saved_version, _, _) = save
-                                .await
-                                .map_err(|err| anyhow!("save request failed: {:?}", err))?;
-                            assert!(saved_version.observed_all(&requested_version));
-                            Ok::<_, anyhow::Error>(())
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching save request", guest_username);
-                            cx.update(|cx| save.detach_and_log_err(cx));
-                        } else {
-                            save.await?;
-                        }
+                }
+                40..=44 => {
+                    let prepare_rename = project.update(cx, |project, cx| {
+                        log::info!(
+                            "{}: preparing rename for buffer {} ({:?})",
+                            username,
+                            buffer.read(cx).remote_id(),
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                        project.prepare_rename(buffer, offset, cx)
+                    });
+                    let prepare_rename = cx.background().spawn(async move {
+                        prepare_rename
+                            .await
+                            .map_err(|err| anyhow!("prepare rename request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching prepare rename request", username);
+                        cx.update(|cx| prepare_rename.detach_and_log_err(cx));
+                    } else {
+                        prepare_rename.await?;
                     }
-                    40..=44 => {
-                        let prepare_rename = project.update(cx, |project, cx| {
-                            log::info!(
-                                "{}: preparing rename for buffer {} ({:?})",
-                                guest_username,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.prepare_rename(buffer, offset, cx)
-                        });
-                        let prepare_rename = cx.background().spawn(async move {
-                            prepare_rename
-                                .await
-                                .map_err(|err| anyhow!("prepare rename request failed: {:?}", err))
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching prepare rename request", guest_username);
-                            cx.update(|cx| prepare_rename.detach_and_log_err(cx));
-                        } else {
-                            prepare_rename.await?;
-                        }
+                }
+                45..=49 => {
+                    let definitions = project.update(cx, |project, cx| {
+                        log::info!(
+                            "{}: requesting definitions for buffer {} ({:?})",
+                            username,
+                            buffer.read(cx).remote_id(),
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                        project.definition(&buffer, offset, cx)
+                    });
+                    let definitions = cx.background().spawn(async move {
+                        definitions
+                            .await
+                            .map_err(|err| anyhow!("definitions request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching definitions request", username);
+                        cx.update(|cx| definitions.detach_and_log_err(cx));
+                    } else {
+                        buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer));
                     }
-                    45..=49 => {
-                        let definitions = project.update(cx, |project, cx| {
-                            log::info!(
-                                "{}: requesting definitions for buffer {} ({:?})",
-                                guest_username,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.definition(&buffer, offset, cx)
-                        });
-                        let definitions = cx.background().spawn(async move {
-                            definitions
-                                .await
-                                .map_err(|err| anyhow!("definitions request failed: {:?}", err))
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching definitions request", guest_username);
-                            cx.update(|cx| definitions.detach_and_log_err(cx));
-                        } else {
-                            client.buffers.extend(
-                                definitions.await?.into_iter().map(|loc| loc.target.buffer),
-                            );
-                        }
+                }
+                50..=54 => {
+                    let highlights = project.update(cx, |project, cx| {
+                        log::info!(
+                            "{}: requesting highlights for buffer {} ({:?})",
+                            username,
+                            buffer.read(cx).remote_id(),
+                            buffer.read(cx).file().unwrap().full_path(cx)
+                        );
+                        let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                        project.document_highlights(&buffer, offset, cx)
+                    });
+                    let highlights = cx.background().spawn(async move {
+                        highlights
+                            .await
+                            .map_err(|err| anyhow!("highlights request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching highlights request", username);
+                        cx.update(|cx| highlights.detach_and_log_err(cx));
+                    } else {
+                        highlights.await?;
                     }
-                    50..=54 => {
-                        let highlights = project.update(cx, |project, cx| {
-                            log::info!(
-                                "{}: requesting highlights for buffer {} ({:?})",
-                                guest_username,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.document_highlights(&buffer, offset, cx)
-                        });
-                        let highlights = cx.background().spawn(async move {
-                            highlights
-                                .await
-                                .map_err(|err| anyhow!("highlights request failed: {:?}", err))
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching highlights request", guest_username);
-                            cx.update(|cx| highlights.detach_and_log_err(cx));
-                        } else {
-                            highlights.await?;
-                        }
+                }
+                55..=59 => {
+                    let search = project.update(cx, |project, cx| {
+                        let query = rng.lock().gen_range('a'..='z');
+                        log::info!("{}: project-wide search {:?}", username, query);
+                        project.search(SearchQuery::text(query, false, false), cx)
+                    });
+                    let search = cx.background().spawn(async move {
+                        search
+                            .await
+                            .map_err(|err| anyhow!("search request failed: {:?}", err))
+                    });
+                    if rng.lock().gen_bool(0.3) {
+                        log::info!("{}: detaching search request", username);
+                        cx.update(|cx| search.detach_and_log_err(cx));
+                    } else {
+                        buffers.extend(search.await?.into_keys());
                     }
-                    55..=59 => {
-                        let search = project.update(cx, |project, cx| {
-                            let query = rng.lock().gen_range('a'..='z');
-                            log::info!("{}: project-wide search {:?}", guest_username, query);
-                            project.search(SearchQuery::text(query, false, false), cx)
-                        });
-                        let search = cx.background().spawn(async move {
-                            search
-                                .await
-                                .map_err(|err| anyhow!("search request failed: {:?}", err))
+                }
+                60..=69 => {
+                    let worktree = project
+                        .read_with(cx, |project, cx| {
+                            project
+                                .worktrees(cx)
+                                .filter(|worktree| {
+                                    let worktree = worktree.read(cx);
+                                    worktree.is_visible()
+                                        && worktree.entries(false).any(|e| e.is_file())
+                                        && worktree.root_entry().map_or(false, |e| e.is_dir())
+                                })
+                                .choose(&mut *rng.lock())
+                        })
+                        .unwrap();
+                    let (worktree_id, worktree_root_name) = worktree
+                        .read_with(cx, |worktree, _| {
+                            (worktree.id(), worktree.root_name().to_string())
                         });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("{}: detaching search request", guest_username);
-                            cx.update(|cx| search.detach_and_log_err(cx));
-                        } else {
-                            client.buffers.extend(search.await?.into_keys());
-                        }
+
+                    let mut new_name = String::new();
+                    for _ in 0..10 {
+                        let letter = rng.lock().gen_range('a'..='z');
+                        new_name.push(letter);
                     }
-                    60..=69 => {
-                        let worktree = project
-                            .read_with(cx, |project, cx| {
-                                project
-                                    .worktrees(cx)
-                                    .filter(|worktree| {
-                                        let worktree = worktree.read(cx);
-                                        worktree.is_visible()
-                                            && worktree.entries(false).any(|e| e.is_file())
-                                            && worktree.root_entry().map_or(false, |e| e.is_dir())
-                                    })
-                                    .choose(&mut *rng.lock())
-                            })
-                            .unwrap();
-                        let (worktree_id, worktree_root_name) = worktree
-                            .read_with(cx, |worktree, _| {
-                                (worktree.id(), worktree.root_name().to_string())
-                            });
 
-                        let mut new_name = String::new();
-                        for _ in 0..10 {
-                            let letter = rng.lock().gen_range('a'..='z');
-                            new_name.push(letter);
-                        }
-                        let mut new_path = PathBuf::new();
-                        new_path.push(new_name);
+                    let is_dir = rng.lock().gen::<bool>();
+                    let mut new_path = PathBuf::new();
+                    new_path.push(new_name);
+                    if !is_dir {
                         new_path.set_extension("rs");
+                    }
+                    log::info!(
+                        "{}: creating {:?} in worktree {} ({})",
+                        username,
+                        new_path,
+                        worktree_id,
+                        worktree_root_name,
+                    );
+                    project
+                        .update(cx, |project, cx| {
+                            project.create_entry((worktree_id, new_path), is_dir, cx)
+                        })
+                        .unwrap()
+                        .await?;
+                }
+                _ => {
+                    buffer.update(cx, |buffer, cx| {
                         log::info!(
-                            "{}: creating {:?} in worktree {} ({})",
-                            guest_username,
-                            new_path,
-                            worktree_id,
-                            worktree_root_name,
+                            "{}: updating buffer {} ({:?})",
+                            username,
+                            buffer.remote_id(),
+                            buffer.file().unwrap().full_path(cx)
                         );
-                        project
-                            .update(cx, |project, cx| {
-                                project.create_entry((worktree_id, new_path), false, cx)
-                            })
-                            .unwrap()
-                            .await?;
-                    }
-                    _ => {
-                        buffer.update(cx, |buffer, cx| {
-                            log::info!(
-                                "{}: updating buffer {} ({:?})",
-                                guest_username,
-                                buffer.remote_id(),
-                                buffer.file().unwrap().full_path(cx)
-                            );
-                            if rng.lock().gen_bool(0.7) {
-                                buffer.randomly_edit(&mut *rng.lock(), 5, cx);
-                            } else {
-                                buffer.randomly_undo_redo(&mut *rng.lock(), cx);
-                            }
-                        });
-                    }
+                        if rng.lock().gen_bool(0.7) {
+                            buffer.randomly_edit(&mut *rng.lock(), 5, cx);
+                        } else {
+                            buffer.randomly_undo_redo(&mut *rng.lock(), cx);
+                        }
+                    });
                 }
-                cx.background().simulate_random_delay().await;
             }
+
             Ok(())
         }
 
-        let result = simulate_guest_internal(
-            &mut self,
-            &guest_username,
-            project.clone(),
-            op_start_signal,
-            rng,
-            &mut cx,
-        )
-        .await;
-        log::info!("{}: done", guest_username);
+        // Setup language server
+        let mut language = Language::new(
+            LanguageConfig {
+                name: "Rust".into(),
+                path_suffixes: vec!["rs".to_string()],
+                ..Default::default()
+            },
+            None,
+        );
+        let _fake_language_servers = language
+            .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
+                name: "the-fake-language-server",
+                capabilities: lsp::LanguageServer::full_capabilities(),
+                initializer: Some(Box::new({
+                    let rng = rng.clone();
+                    let fs = self.fs.clone();
+                    move |fake_server: &mut FakeLanguageServer| {
+                        fake_server.handle_request::<lsp::request::Completion, _, _>(
+                            |_, _| async move {
+                                Ok(Some(lsp::CompletionResponse::Array(vec![
+                                    lsp::CompletionItem {
+                                        text_edit: Some(lsp::CompletionTextEdit::Edit(
+                                            lsp::TextEdit {
+                                                range: lsp::Range::new(
+                                                    lsp::Position::new(0, 0),
+                                                    lsp::Position::new(0, 0),
+                                                ),
+                                                new_text: "the-new-text".to_string(),
+                                            },
+                                        )),
+                                        ..Default::default()
+                                    },
+                                ])))
+                            },
+                        );
+
+                        fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
+                            |_, _| async move {
+                                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
+                                    lsp::CodeAction {
+                                        title: "the-code-action".to_string(),
+                                        ..Default::default()
+                                    },
+                                )]))
+                            },
+                        );
+
+                        fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
+                            |params, _| async move {
+                                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                                    params.position,
+                                    params.position,
+                                ))))
+                            },
+                        );
+
+                        fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
+                            let fs = fs.clone();
+                            let rng = rng.clone();
+                            move |_, _| {
+                                let fs = fs.clone();
+                                let rng = rng.clone();
+                                async move {
+                                    let files = fs.files().await;
+                                    let mut rng = rng.lock();
+                                    let count = rng.gen_range::<usize, _>(1..3);
+                                    let files = (0..count)
+                                        .map(|_| files.choose(&mut *rng).unwrap())
+                                        .collect::<Vec<_>>();
+                                    log::info!("LSP: Returning definitions in files {:?}", &files);
+                                    Ok(Some(lsp::GotoDefinitionResponse::Array(
+                                        files
+                                            .into_iter()
+                                            .map(|file| lsp::Location {
+                                                uri: lsp::Url::from_file_path(file).unwrap(),
+                                                range: Default::default(),
+                                            })
+                                            .collect(),
+                                    )))
+                                }
+                            }
+                        });
+
+                        fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
+                            {
+                                let rng = rng.clone();
+                                move |_, _| {
+                                    let mut highlights = Vec::new();
+                                    let highlight_count = rng.lock().gen_range(1..=5);
+                                    for _ in 0..highlight_count {
+                                        let start_row = rng.lock().gen_range(0..100);
+                                        let start_column = rng.lock().gen_range(0..100);
+                                        let start = PointUtf16::new(start_row, start_column);
+                                        let end_row = rng.lock().gen_range(0..100);
+                                        let end_column = rng.lock().gen_range(0..100);
+                                        let end = PointUtf16::new(end_row, end_column);
+                                        let range =
+                                            if start > end { end..start } else { start..end };
+                                        highlights.push(lsp::DocumentHighlight {
+                                            range: range_to_lsp(range.clone()),
+                                            kind: Some(lsp::DocumentHighlightKind::READ),
+                                        });
+                                    }
+                                    highlights.sort_unstable_by_key(|highlight| {
+                                        (highlight.range.start, highlight.range.end)
+                                    });
+                                    async move { Ok(Some(highlights)) }
+                                }
+                            },
+                        );
+                    }
+                })),
+                ..Default::default()
+            }))
+            .await;
+        self.language_registry.add(Arc::new(language));
+
+        while op_start_signal.next().await.is_some() {
+            if let Err(error) = tick(&mut self, &username, rng.clone(), &mut cx).await {
+                log::error!("{} error: {:?}", username, error);
+            }
+
+            cx.background().simulate_random_delay().await;
+        }
+        log::info!("{}: done", username);
 
-        (self, project, cx, result.err())
+        (self, cx)
     }
 }
 

crates/collab/src/rpc.rs 🔗

@@ -1041,17 +1041,19 @@ impl Server {
 
         for conn_id in project.connection_ids() {
             if conn_id != request.sender_id {
-                self.peer.send(
-                    conn_id,
-                    proto::AddProjectCollaborator {
-                        project_id: project_id.to_proto(),
-                        collaborator: Some(proto::Collaborator {
-                            peer_id: request.sender_id.0,
-                            replica_id: replica_id as u32,
-                            user_id: guest_user_id.to_proto(),
-                        }),
-                    },
-                )?;
+                self.peer
+                    .send(
+                        conn_id,
+                        proto::AddProjectCollaborator {
+                            project_id: project_id.to_proto(),
+                            collaborator: Some(proto::Collaborator {
+                                peer_id: request.sender_id.0,
+                                replica_id: replica_id as u32,
+                                user_id: guest_user_id.to_proto(),
+                            }),
+                        },
+                    )
+                    .trace_err();
             }
         }
 

crates/collab/src/rpc/store.rs 🔗

@@ -674,8 +674,13 @@ impl Store {
             .connected_users
             .get_mut(&recipient_user_id)
             .ok_or_else(|| anyhow!("no such connection"))?;
-        if let Some(active_call) = recipient.active_call.take() {
+        if let Some(active_call) = recipient.active_call {
             anyhow::ensure!(active_call.room_id == room_id, "no such room");
+            anyhow::ensure!(
+                active_call.connection_id.is_none(),
+                "cannot decline a call after joining room"
+            );
+            recipient.active_call.take();
             let recipient_connection_ids = self
                 .connection_ids_for_user(recipient_user_id)
                 .collect::<Vec<_>>();
@@ -1196,7 +1201,9 @@ impl Store {
                 assert!(
                     self.connections
                         .contains_key(&ConnectionId(participant.peer_id)),
-                    "room contains participant that has disconnected"
+                    "room {} contains participant {:?} that has disconnected",
+                    room_id,
+                    participant
                 );
 
                 for participant_project in &participant.projects {

crates/fs/src/fs.rs 🔗

@@ -631,6 +631,21 @@ impl FakeFs {
         }
     }
 
+    pub async fn directories(&self) -> Vec<PathBuf> {
+        let mut result = Vec::new();
+        let mut queue = collections::VecDeque::new();
+        queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone()));
+        while let Some((path, entry)) = queue.pop_front() {
+            if let FakeFsEntry::Dir { entries, .. } = &*entry.lock().await {
+                for (name, entry) in entries {
+                    queue.push_back((path.join(name), entry.clone()));
+                }
+                result.push(path);
+            }
+        }
+        result
+    }
+
     pub async fn files(&self) -> Vec<PathBuf> {
         let mut result = Vec::new();
         let mut queue = collections::VecDeque::new();