Set up flow for mutating clients via explicit operation values

Max Brunsfeld created

Change summary

crates/collab/src/tests.rs                              |  12 
crates/collab/src/tests/randomized_integration_tests.rs | 955 +++++++---
crates/text/src/text.rs                                 |  20 
3 files changed, 649 insertions(+), 338 deletions(-)

Detailed changes

crates/collab/src/tests.rs 🔗

@@ -24,7 +24,7 @@ use std::{
     cell::{Ref, RefCell, RefMut},
     env,
     ops::{Deref, DerefMut},
-    path::{Path, PathBuf},
+    path::Path,
     sync::{
         atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
         Arc,
@@ -332,7 +332,6 @@ struct TestClientState {
     local_projects: Vec<ModelHandle<Project>>,
     remote_projects: Vec<ModelHandle<Project>>,
     buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
-    next_root_dir_id: usize,
 }
 
 impl Deref for TestClient {
@@ -483,15 +482,6 @@ impl TestClient {
             )
         })
     }
-
-    fn create_new_root_dir(&self) -> PathBuf {
-        format!(
-            "/{}-root-{}",
-            self.username,
-            util::post_inc(&mut self.state.borrow_mut().next_root_dir_id)
-        )
-        .into()
-    }
 }
 
 impl Drop for TestClient {

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -7,7 +7,7 @@ use anyhow::{anyhow, Result};
 use call::ActiveCall;
 use client::RECEIVE_TIMEOUT;
 use collections::BTreeMap;
-use fs::{FakeFs, Fs as _};
+use fs::Fs as _;
 use futures::StreamExt as _;
 use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
 use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16};
@@ -15,217 +15,13 @@ use lsp::FakeLanguageServer;
 use parking_lot::Mutex;
 use project::{search::SearchQuery, Project};
 use rand::prelude::*;
-use std::{env, path::PathBuf, rc::Rc, sync::Arc};
-
-struct TestPlan {
-    rng: StdRng,
-    allow_server_restarts: bool,
-    allow_client_reconnection: bool,
-    allow_client_disconnection: bool,
-}
-
-#[derive(Debug)]
-enum Operation {
-    AddConnection {
-        user_id: UserId,
-    },
-    RemoveConnection {
-        user_id: UserId,
-    },
-    BounceConnection {
-        user_id: UserId,
-    },
-    RestartServer,
-    RunUntilParked,
-    MutateClient {
-        user_id: UserId,
-        operation: ClientOperation,
-    },
-}
-
-#[derive(Debug)]
-enum ClientOperation {
-    AcceptIncomingCall,
-    RejectIncomingCall,
-    LeaveCall,
-    InviteContactToCall {
-        user_id: UserId,
-    },
-    OpenLocalProject {
-        first_root_path: PathBuf,
-    },
-    OpenRemoteProject {
-        host_id: UserId,
-        first_root_name: String,
-    },
-    AddWorktreeToProject {
-        first_root_path: PathBuf,
-        new_root_path: PathBuf,
-    },
-    CloseProject {
-        id: u64,
-    },
-}
-
-impl TestPlan {
-    async fn next_operation(
-        &mut self,
-        clients: &[(Rc<TestClient>, TestAppContext)],
-        offline_users: &[(UserId, String)],
-    ) -> Operation {
-        let operation = loop {
-            break match self.rng.gen_range(0..100) {
-                0..=9 if !offline_users.is_empty() => {
-                    let user_id = offline_users[self.rng.gen_range(0..offline_users.len())].0;
-                    Operation::AddConnection { user_id }
-                }
-                10..=14 if clients.len() > 1 && self.allow_client_disconnection => {
-                    let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
-                    let user_id = client.current_user_id(cx);
-                    Operation::RemoveConnection { user_id }
-                }
-                15..=19 if clients.len() > 1 && self.allow_client_reconnection => {
-                    let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
-                    let user_id = client.current_user_id(cx);
-                    Operation::BounceConnection { user_id }
-                }
-                20..=24 if self.allow_server_restarts => Operation::RestartServer,
-                25..=29 => Operation::RunUntilParked,
-                _ if !clients.is_empty() => {
-                    let ix = self.rng.gen_range(0..clients.len());
-                    let (client, cx) = &clients[ix];
-                    let user_id = client.current_user_id(cx);
-                    let operation = self.next_client_operation(clients, ix).await;
-                    Operation::MutateClient { user_id, operation }
-                }
-                _ => continue,
-            };
-        };
-        operation
-    }
-
-    async fn next_client_operation(
-        &mut self,
-        clients: &[(Rc<TestClient>, TestAppContext)],
-        client_ix: usize,
-    ) -> ClientOperation {
-        let (client, cx) = &clients[client_ix];
-        let call = cx.read(ActiveCall::global);
-
-        loop {
-            match self.rng.gen_range(0..100) {
-                // Respond to an incoming call
-                0..=19 => {
-                    if call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
-                        return if self.rng.gen_bool(0.7) {
-                            ClientOperation::AcceptIncomingCall
-                        } else {
-                            ClientOperation::RejectIncomingCall
-                        };
-                    }
-                }
-
-                // Invite a contact to the current call
-                20..=29 => {
-                    let available_contacts = client.user_store.read_with(cx, |user_store, _| {
-                        user_store
-                            .contacts()
-                            .iter()
-                            .filter(|contact| contact.online && !contact.busy)
-                            .cloned()
-                            .collect::<Vec<_>>()
-                    });
-                    if !available_contacts.is_empty() {
-                        let contact = available_contacts.choose(&mut self.rng).unwrap();
-                        return ClientOperation::InviteContactToCall {
-                            user_id: UserId(contact.user.id as i32),
-                        };
-                    }
-                }
-
-                // Leave the current call
-                30..=39 => {
-                    if self.allow_client_disconnection
-                        && call.read_with(cx, |call, _| call.room().is_some())
-                    {
-                        return ClientOperation::LeaveCall;
-                    }
-                }
-
-                // Open a remote project
-                40..=49 => {
-                    if let Some(room) = call.read_with(cx, |call, _| call.room().cloned()) {
-                        let remote_projects = room.read_with(cx, |room, _| {
-                            room.remote_participants()
-                                .values()
-                                .flat_map(|participant| {
-                                    participant.projects.iter().map(|project| {
-                                        (
-                                            UserId::from_proto(participant.user.id),
-                                            project.worktree_root_names[0].clone(),
-                                        )
-                                    })
-                                })
-                                .collect::<Vec<_>>()
-                        });
-                        if !remote_projects.is_empty() {
-                            let (host_id, first_root_name) =
-                                remote_projects.choose(&mut self.rng).unwrap().clone();
-                            return ClientOperation::OpenRemoteProject {
-                                host_id,
-                                first_root_name,
-                            };
-                        }
-                    }
-                }
-
-                // Open a local project
-                50..=59 => {
-                    let paths = client.fs.paths().await;
-                    let first_root_path = if paths.is_empty() || self.rng.gen() {
-                        client.create_new_root_dir()
-                    } else {
-                        paths.choose(&mut self.rng).unwrap().clone()
-                    };
-                    return ClientOperation::OpenLocalProject { first_root_path };
-                }
-
-                // Add a worktree to a local project
-                60..=69 if !client.local_projects().is_empty() => {
-                    let project = client
-                        .local_projects()
-                        .choose(&mut self.rng)
-                        .unwrap()
-                        .clone();
-
-                    let first_root_path = project.read_with(cx, |project, cx| {
-                        project
-                            .visible_worktrees(cx)
-                            .next()
-                            .unwrap()
-                            .read(cx)
-                            .abs_path()
-                            .to_path_buf()
-                    });
-
-                    let paths = client.fs.paths().await;
-                    let new_root_path = if paths.is_empty() || self.rng.gen() {
-                        client.create_new_root_dir()
-                    } else {
-                        paths.choose(&mut self.rng).unwrap().clone()
-                    };
-
-                    return ClientOperation::AddWorktreeToProject {
-                        first_root_path,
-                        new_root_path,
-                    };
-                }
-
-                _ => continue,
-            };
-        }
-    }
-}
+use std::{
+    env,
+    ops::Range,
+    path::{Path, PathBuf},
+    rc::Rc,
+    sync::Arc,
+};
 
 #[gpui::test(iterations = 100)]
 async fn test_random_collaboration(
@@ -246,7 +42,7 @@ async fn test_random_collaboration(
     let mut server = TestServer::start(&deterministic).await;
     let db = server.app_state.db.clone();
 
-    let mut available_users = Vec::new();
+    let mut users = Vec::new();
     for ix in 0..max_peers {
         let username = format!("user-{}", ix + 1);
         let user_id = db
@@ -262,47 +58,55 @@ async fn test_random_collaboration(
             .await
             .unwrap()
             .user_id;
-        available_users.push((user_id, username));
+        users.push(UserTestPlan {
+            user_id,
+            username,
+            online: false,
+            next_root_id: 0,
+        });
     }
 
-    let plan = Arc::new(Mutex::new(TestPlan {
-        allow_server_restarts: rng.gen_bool(0.7),
-        allow_client_reconnection: rng.gen_bool(0.7),
-        allow_client_disconnection: rng.gen_bool(0.1),
-        rng,
-    }));
-
-    for (ix, (user_id_a, _)) in available_users.iter().enumerate() {
-        for (user_id_b, _) in &available_users[ix + 1..] {
+    for (ix, user_a) in users.iter().enumerate() {
+        for user_b in &users[ix + 1..] {
             server
                 .app_state
                 .db
-                .send_contact_request(*user_id_a, *user_id_b)
+                .send_contact_request(user_a.user_id, user_b.user_id)
                 .await
                 .unwrap();
             server
                 .app_state
                 .db
-                .respond_to_contact_request(*user_id_b, *user_id_a, true)
+                .respond_to_contact_request(user_b.user_id, user_a.user_id, true)
                 .await
                 .unwrap();
         }
     }
 
+    let plan = Arc::new(Mutex::new(TestPlan {
+        users,
+        allow_server_restarts: rng.gen_bool(0.7),
+        allow_client_reconnection: rng.gen_bool(0.7),
+        allow_client_disconnection: rng.gen_bool(0.1),
+        rng,
+    }));
+
     let mut clients = Vec::new();
     let mut client_tasks = Vec::new();
-    let mut op_start_signals = Vec::new();
+    let mut operation_channels = Vec::new();
     let mut next_entity_id = 100000;
 
-    for _ in 0..max_operations {
-        let next_operation = plan.lock().next_operation(&clients, &available_users).await;
+    let mut i = 0;
+    while i < max_operations {
+        let next_operation = plan.lock().next_operation(&clients).await;
         match next_operation {
             Operation::AddConnection { user_id } => {
-                let user_ix = available_users
-                    .iter()
-                    .position(|(id, _)| *id == user_id)
-                    .unwrap();
-                let (_, username) = available_users.remove(user_ix);
+                let username = {
+                    let mut plan = plan.lock();
+                    let mut user = plan.user(user_id);
+                    user.online = true;
+                    user.username.clone()
+                };
                 log::info!("Adding new connection for {}", username);
                 next_entity_id += 100000;
                 let mut client_cx = TestAppContext::new(
@@ -316,18 +120,19 @@ async fn test_random_collaboration(
                     cx.function_name.clone(),
                 );
 
-                let op_start_signal = futures::channel::mpsc::unbounded();
+                let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
                 let client = Rc::new(server.create_client(&mut client_cx, &username).await);
-                op_start_signals.push(op_start_signal.0);
+                operation_channels.push(operation_tx);
                 clients.push((client.clone(), client_cx.clone()));
                 client_tasks.push(client_cx.foreground().spawn(simulate_client(
                     client,
-                    op_start_signal.1,
+                    operation_rx,
                     plan.clone(),
                     client_cx,
                 )));
 
                 log::info!("Added connection for {}", username);
+                i += 1;
             }
 
             Operation::RemoveConnection { user_id } => {
@@ -345,7 +150,7 @@ async fn test_random_collaboration(
                 let removed_peer_id = user_connection_ids[0].into();
                 let (client, mut client_cx) = clients.remove(client_ix);
                 let client_task = client_tasks.remove(client_ix);
-                op_start_signals.remove(client_ix);
+                operation_channels.remove(client_ix);
                 server.forbid_connections();
                 server.disconnect_client(removed_peer_id);
                 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
@@ -386,11 +191,12 @@ async fn test_random_collaboration(
                 }
 
                 log::info!("{} removed", client.username);
-                available_users.push((user_id, client.username.clone()));
+                plan.lock().user(user_id).online = false;
                 client_cx.update(|cx| {
                     cx.clear_globals();
                     drop(client);
                 });
+                i += 1;
             }
 
             Operation::BounceConnection { user_id } => {
@@ -404,6 +210,7 @@ async fn test_random_collaboration(
                 let peer_id = user_connection_ids[0].into();
                 server.disconnect_client(peer_id);
                 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+                i += 1;
             }
 
             Operation::RestartServer => {
@@ -420,25 +227,27 @@ async fn test_random_collaboration(
                     .await
                     .unwrap();
                 assert_eq!(stale_room_ids, vec![]);
+                i += 1;
             }
 
             Operation::RunUntilParked => {
                 deterministic.run_until_parked();
             }
 
-            Operation::MutateClient { user_id, operation } => {
-                let client_ix = clients
-                    .iter()
-                    .position(|(client, cx)| client.current_user_id(cx) == user_id)
-                    .unwrap();
-                op_start_signals[client_ix]
-                    .unbounded_send(operation)
-                    .unwrap();
+            Operation::MutateClients(user_ids) => {
+                for user_id in user_ids {
+                    let client_ix = clients
+                        .iter()
+                        .position(|(client, cx)| client.current_user_id(cx) == user_id)
+                        .unwrap();
+                    operation_channels[client_ix].unbounded_send(()).unwrap();
+                    i += 1;
+                }
             }
         }
     }
 
-    drop(op_start_signals);
+    drop(operation_channels);
     deterministic.start_waiting();
     futures::future::join_all(client_tasks).await;
     deterministic.finish_waiting();
@@ -618,9 +427,331 @@ async fn test_random_collaboration(
     }
 }
 
+struct TestPlan {
+    rng: StdRng,
+    users: Vec<UserTestPlan>,
+    allow_server_restarts: bool,
+    allow_client_reconnection: bool,
+    allow_client_disconnection: bool,
+}
+
+struct UserTestPlan {
+    user_id: UserId,
+    username: String,
+    next_root_id: usize,
+    online: bool,
+}
+
+#[derive(Debug)]
+enum Operation {
+    AddConnection { user_id: UserId },
+    RemoveConnection { user_id: UserId },
+    BounceConnection { user_id: UserId },
+    RestartServer,
+    RunUntilParked,
+    MutateClients(Vec<UserId>),
+}
+
+#[derive(Debug)]
+enum ClientOperation {
+    AcceptIncomingCall,
+    RejectIncomingCall,
+    LeaveCall,
+    InviteContactToCall {
+        user_id: UserId,
+    },
+    OpenLocalProject {
+        first_root_name: String,
+    },
+    OpenRemoteProject {
+        host_id: UserId,
+        first_root_name: String,
+    },
+    AddWorktreeToProject {
+        project_root_name: String,
+        new_root_path: PathBuf,
+    },
+    CloseRemoteProject {
+        project_root_name: String,
+    },
+    OpenBuffer {
+        project_root_name: String,
+        full_path: PathBuf,
+    },
+    EditBuffer {
+        project_root_name: String,
+        full_path: PathBuf,
+        edits: Vec<(Range<usize>, Arc<str>)>,
+    },
+    Other,
+}
+
+impl TestPlan {
+    async fn next_operation(&mut self, clients: &[(Rc<TestClient>, TestAppContext)]) -> Operation {
+        let operation = loop {
+            break match self.rng.gen_range(0..100) {
+                0..=19 if clients.len() < self.users.len() => {
+                    let user = self
+                        .users
+                        .iter()
+                        .filter(|u| !u.online)
+                        .choose(&mut self.rng)
+                        .unwrap();
+                    Operation::AddConnection {
+                        user_id: user.user_id,
+                    }
+                }
+                20..=24 if clients.len() > 1 && self.allow_client_disconnection => {
+                    let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
+                    let user_id = client.current_user_id(cx);
+                    Operation::RemoveConnection { user_id }
+                }
+                25..=29 if clients.len() > 1 && self.allow_client_reconnection => {
+                    let (client, cx) = &clients[self.rng.gen_range(0..clients.len())];
+                    let user_id = client.current_user_id(cx);
+                    Operation::BounceConnection { user_id }
+                }
+                30..=34 if self.allow_server_restarts && clients.len() > 1 => {
+                    Operation::RestartServer
+                }
+                35..=39 => Operation::RunUntilParked,
+                _ if !clients.is_empty() => {
+                    let user_ids = (0..self.rng.gen_range(0..10))
+                        .map(|_| {
+                            let ix = self.rng.gen_range(0..clients.len());
+                            let (client, cx) = &clients[ix];
+                            client.current_user_id(cx)
+                        })
+                        .collect();
+                    Operation::MutateClients(user_ids)
+                }
+                _ => continue,
+            };
+        };
+        operation
+    }
+
+    async fn next_client_operation(
+        &mut self,
+        client: &TestClient,
+        cx: &TestAppContext,
+    ) -> ClientOperation {
+        let user_id = client.current_user_id(cx);
+        let call = cx.read(ActiveCall::global);
+        let operation = loop {
+            match self.rng.gen_range(0..100) {
+                // Mutate the call
+                0..=19 => match self.rng.gen_range(0..100_u32) {
+                    // Respond to an incoming call
+                    0..=39 => {
+                        if call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
+                            break if self.rng.gen_bool(0.7) {
+                                ClientOperation::AcceptIncomingCall
+                            } else {
+                                ClientOperation::RejectIncomingCall
+                            };
+                        }
+                    }
+
+                    // Invite a contact to the current call
+                    30..=89 => {
+                        let available_contacts =
+                            client.user_store.read_with(cx, |user_store, _| {
+                                user_store
+                                    .contacts()
+                                    .iter()
+                                    .filter(|contact| contact.online && !contact.busy)
+                                    .cloned()
+                                    .collect::<Vec<_>>()
+                            });
+                        if !available_contacts.is_empty() {
+                            let contact = available_contacts.choose(&mut self.rng).unwrap();
+                            break ClientOperation::InviteContactToCall {
+                                user_id: UserId(contact.user.id as i32),
+                            };
+                        }
+                    }
+
+                    // Leave the current call
+                    90.. => {
+                        if self.allow_client_disconnection
+                            && call.read_with(cx, |call, _| call.room().is_some())
+                        {
+                            break ClientOperation::LeaveCall;
+                        }
+                    }
+                },
+
+                // Mutate projects
+                20..=39 => match self.rng.gen_range(0..100_u32) {
+                    // Open a remote project
+                    0..=30 => {
+                        if let Some(room) = call.read_with(cx, |call, _| call.room().cloned()) {
+                            let remote_projects = room.read_with(cx, |room, _| {
+                                room.remote_participants()
+                                    .values()
+                                    .flat_map(|participant| {
+                                        participant.projects.iter().map(|project| {
+                                            (
+                                                UserId::from_proto(participant.user.id),
+                                                project.worktree_root_names[0].clone(),
+                                            )
+                                        })
+                                    })
+                                    .collect::<Vec<_>>()
+                            });
+                            if !remote_projects.is_empty() {
+                                let (host_id, first_root_name) =
+                                    remote_projects.choose(&mut self.rng).unwrap().clone();
+                                break ClientOperation::OpenRemoteProject {
+                                    host_id,
+                                    first_root_name,
+                                };
+                            }
+                        }
+                    }
+
+                    // Close a remote project
+                    31..=40 => {
+                        if !client.remote_projects().is_empty() {
+                            let project = client
+                                .remote_projects()
+                                .choose(&mut self.rng)
+                                .unwrap()
+                                .clone();
+                            let first_root_name = root_name_for_project(&project, cx);
+                            break ClientOperation::CloseRemoteProject {
+                                project_root_name: first_root_name,
+                            };
+                        }
+                    }
+
+                    // Open a local project
+                    41..=60 => {
+                        let first_root_name = self.next_root_dir_name(user_id);
+                        break ClientOperation::OpenLocalProject { first_root_name };
+                    }
+
+                    // Add a worktree to a local project
+                    61.. => {
+                        if !client.local_projects().is_empty() {
+                            let project = client
+                                .local_projects()
+                                .choose(&mut self.rng)
+                                .unwrap()
+                                .clone();
+                            let project_root_name = root_name_for_project(&project, cx);
+
+                            let mut paths = client.fs.paths().await;
+                            paths.remove(0);
+                            let new_root_path = if paths.is_empty() || self.rng.gen() {
+                                Path::new("/").join(&self.next_root_dir_name(user_id))
+                            } else {
+                                paths.choose(&mut self.rng).unwrap().clone()
+                            };
+
+                            break ClientOperation::AddWorktreeToProject {
+                                project_root_name,
+                                new_root_path,
+                            };
+                        }
+                    }
+                },
+
+                // Mutate buffers
+                40..=79 => {
+                    let Some(project) = choose_random_project(client, &mut self.rng) else { continue };
+                    let project_root_name = root_name_for_project(&project, cx);
+
+                    match self.rng.gen_range(0..100_u32) {
+                        // Manipulate an existing buffer
+                        0..=80 => {
+                            let Some(buffer) = client
+                                .buffers_for_project(&project)
+                                .iter()
+                                .choose(&mut self.rng)
+                                .cloned() else { continue };
+
+                            match self.rng.gen_range(0..100_u32) {
+                                0..=9 => {
+                                    let (full_path, edits) = buffer.read_with(cx, |buffer, cx| {
+                                        (
+                                            buffer.file().unwrap().full_path(cx),
+                                            buffer.get_random_edits(&mut self.rng, 3),
+                                        )
+                                    });
+                                    break ClientOperation::EditBuffer {
+                                        project_root_name,
+                                        full_path,
+                                        edits,
+                                    };
+                                }
+                                _ => {}
+                            }
+                        }
+
+                        // Open a buffer
+                        81.. => {
+                            let worktree = project.read_with(cx, |project, cx| {
+                                project
+                                    .worktrees(cx)
+                                    .filter(|worktree| {
+                                        let worktree = worktree.read(cx);
+                                        worktree.is_visible()
+                                            && worktree.entries(false).any(|e| e.is_file())
+                                    })
+                                    .choose(&mut self.rng)
+                            });
+                            let Some(worktree) = worktree else { continue };
+                            let full_path = worktree.read_with(cx, |worktree, _| {
+                                let entry = worktree
+                                    .entries(false)
+                                    .filter(|e| e.is_file())
+                                    .choose(&mut self.rng)
+                                    .unwrap();
+                                if entry.path.as_ref() == Path::new("") {
+                                    Path::new(worktree.root_name()).into()
+                                } else {
+                                    Path::new(worktree.root_name()).join(&entry.path)
+                                }
+                            });
+                            break ClientOperation::OpenBuffer {
+                                project_root_name,
+                                full_path,
+                            };
+                        }
+                    }
+                }
+
+                _ => break ClientOperation::Other,
+            }
+        };
+        operation
+    }
+
+    fn next_root_dir_name(&mut self, user_id: UserId) -> String {
+        let user_ix = self
+            .users
+            .iter()
+            .position(|user| user.user_id == user_id)
+            .unwrap();
+        let root_id = util::post_inc(&mut self.users[user_ix].next_root_id);
+        format!("dir-{user_id}-{root_id}")
+    }
+
+    fn user(&mut self, user_id: UserId) -> &mut UserTestPlan {
+        let ix = self
+            .users
+            .iter()
+            .position(|user| user.user_id == user_id)
+            .unwrap();
+        &mut self.users[ix]
+    }
+}
+
 async fn simulate_client(
     client: Rc<TestClient>,
-    mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<ClientOperation>,
+    mut operation_rx: futures::channel::mpsc::UnboundedReceiver<()>,
     plan: Arc<Mutex<TestPlan>>,
     mut cx: TestAppContext,
 ) {
@@ -736,8 +867,10 @@ async fn simulate_client(
         .await;
     client.language_registry.add(Arc::new(language));
 
-    while op_start_signal.next().await.is_some() {
-        if let Err(error) = randomly_mutate_client(&client, plan.clone(), &mut cx).await {
+    while operation_rx.next().await.is_some() {
+        let operation = plan.lock().next_client_operation(&client, &cx).await;
+        if let Err(error) = apply_client_operation(&client, plan.clone(), operation, &mut cx).await
+        {
             log::error!("{} error: {:?}", client.username, error);
         }
 
@@ -746,98 +879,274 @@ async fn simulate_client(
     log::info!("{}: done", client.username);
 }
 
-// async fn apply_client_operation(
-//     client: &mut TestClient,
-//     plan: Arc<Mutex<TestPlan>>,
-//     operation: ClientOperation,
-//     cx: &mut TestAppContext,
-// ) -> Result<()> {
-//     match operation {
-//         ClientOperation::AcceptIncomingCall => todo!(),
-//         ClientOperation::RejectIncomingCall => todo!(),
-//         ClientOperation::OpenLocalProject { path } => todo!(),
-//         ClientOperation::AddWorktreeToProject {
-//             existing_path,
-//             new_path,
-//         } => todo!(),
-//         ClientOperation::CloseProject { existing_path } => todo!(),
-//     }
-// }
-
-async fn randomly_mutate_client(
-    client: &Rc<TestClient>,
-    plan: Arc<Mutex<TestPlan>>,
-    cx: &mut TestAppContext,
-) -> Result<()> {
-    let choice = plan.lock().rng.gen_range(0..100);
-    match choice {
-        0..=19 => randomly_mutate_active_call(client, &plan, cx).await?,
-        20..=49 => randomly_mutate_projects(client, &plan, cx).await?,
-        50..=59 if !client.local_projects().is_empty() || !client.remote_projects().is_empty() => {
-            randomly_mutate_worktrees(client, &plan, cx).await?;
-        }
-        60..=84 if !client.local_projects().is_empty() || !client.remote_projects().is_empty() => {
-            randomly_query_and_mutate_buffers(client, &plan, cx).await?;
-        }
-        _ => randomly_mutate_fs(client, &plan).await,
-    }
-
-    Ok(())
-}
-
-async fn randomly_mutate_active_call(
+async fn apply_client_operation(
     client: &TestClient,
-    plan: &Arc<Mutex<TestPlan>>,
+    plan: Arc<Mutex<TestPlan>>,
+    operation: ClientOperation,
     cx: &mut TestAppContext,
 ) -> Result<()> {
-    let active_call = cx.read(ActiveCall::global);
-    if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) {
-        if plan.lock().rng.gen_bool(0.7) {
+    match operation {
+        ClientOperation::AcceptIncomingCall => {
             log::info!("{}: accepting incoming call", client.username);
+            let active_call = cx.read(ActiveCall::global);
             active_call
                 .update(cx, |call, cx| call.accept_incoming(cx))
                 .await?;
-        } else {
+        }
+
+        ClientOperation::RejectIncomingCall => {
             log::info!("{}: declining incoming call", client.username);
+            let active_call = cx.read(ActiveCall::global);
             active_call.update(cx, |call, _| call.decline_incoming())?;
         }
-    } else {
-        let available_contacts = client.user_store.read_with(cx, |user_store, _| {
-            user_store
-                .contacts()
+
+        ClientOperation::LeaveCall => {
+            log::info!("{}: hanging up", client.username);
+            let active_call = cx.read(ActiveCall::global);
+            active_call.update(cx, |call, cx| call.hang_up(cx))?;
+        }
+
+        ClientOperation::InviteContactToCall { user_id } => {
+            log::info!("{}: inviting {}", client.username, user_id,);
+            let active_call = cx.read(ActiveCall::global);
+            active_call
+                .update(cx, |call, cx| call.invite(user_id.to_proto(), None, cx))
+                .await?;
+        }
+
+        ClientOperation::OpenLocalProject { first_root_name } => {
+            log::info!(
+                "{}: opening local project at {:?}",
+                client.username,
+                first_root_name
+            );
+            let root_path = Path::new("/").join(&first_root_name);
+            client.fs.create_dir(&root_path).await.unwrap();
+            client
+                .fs
+                .create_file(&root_path.join("main.rs"), Default::default())
+                .await
+                .unwrap();
+            let project = client.build_local_project(root_path, cx).await.0;
+
+            let active_call = cx.read(ActiveCall::global);
+            if active_call.read_with(cx, |call, _| call.room().is_some())
+                && project.read_with(cx, |project, _| project.is_local() && !project.is_shared())
+            {
+                match active_call
+                    .update(cx, |call, cx| call.share_project(project.clone(), cx))
+                    .await
+                {
+                    Ok(project_id) => {
+                        log::info!(
+                            "{}: shared project {} with id {}",
+                            client.username,
+                            first_root_name,
+                            project_id
+                        );
+                    }
+                    Err(error) => {
+                        log::error!(
+                            "{}: error sharing project {}: {:?}",
+                            client.username,
+                            first_root_name,
+                            error
+                        );
+                    }
+                }
+            }
+
+            client.local_projects_mut().push(project.clone());
+        }
+
+        ClientOperation::AddWorktreeToProject {
+            project_root_name,
+            new_root_path,
+        } => {
+            log::info!(
+                "{}: finding/creating local worktree at {:?} to project with root path {}",
+                client.username,
+                new_root_path,
+                project_root_name
+            );
+            let project = project_for_root_name(client, &project_root_name, cx)
+                .expect("invalid project in test operation");
+            if !client.fs.paths().await.contains(&new_root_path) {
+                client.fs.create_dir(&new_root_path).await.unwrap();
+            }
+            project
+                .update(cx, |project, cx| {
+                    project.find_or_create_local_worktree(&new_root_path, true, cx)
+                })
+                .await
+                .unwrap();
+        }
+
+        ClientOperation::CloseRemoteProject { project_root_name } => {
+            log::info!(
+                "{}: dropping project with root path {}",
+                client.username,
+                project_root_name,
+            );
+            let ix = project_ix_for_root_name(&*client.remote_projects(), &project_root_name, cx)
+                .expect("invalid project in test operation");
+            client.remote_projects_mut().remove(ix);
+        }
+
+        ClientOperation::OpenRemoteProject {
+            host_id,
+            first_root_name,
+        } => {
+            log::info!(
+                "{}: joining remote project of user {}, root name {}",
+                client.username,
+                host_id,
+                first_root_name,
+            );
+            let active_call = cx.read(ActiveCall::global);
+            let project_id = active_call
+                .read_with(cx, |call, cx| {
+                    let room = call.room().cloned()?;
+                    let participant = room
+                        .read(cx)
+                        .remote_participants()
+                        .get(&host_id.to_proto())?;
+                    let project = participant
+                        .projects
+                        .iter()
+                        .find(|project| project.worktree_root_names[0] == first_root_name)?;
+                    Some(project.id)
+                })
+                .expect("invalid project in test operation");
+            let project = client.build_remote_project(project_id, cx).await;
+            client.remote_projects_mut().push(project);
+        }
+
+        ClientOperation::OpenBuffer {
+            project_root_name,
+            full_path,
+        } => {
+            log::info!(
+                "{}: opening path {:?} in project {}",
+                client.username,
+                full_path,
+                project_root_name,
+            );
+            let project = project_for_root_name(client, &project_root_name, cx)
+                .expect("invalid project in test operation");
+            let mut components = full_path.components();
+            let root_name = components.next().unwrap().as_os_str().to_str().unwrap();
+            let path = components.as_path();
+            let worktree_id = project
+                .read_with(cx, |project, cx| {
+                    project.worktrees(cx).find_map(|worktree| {
+                        let worktree = worktree.read(cx);
+                        if worktree.root_name() == root_name {
+                            Some(worktree.id())
+                        } else {
+                            None
+                        }
+                    })
+                })
+                .expect("invalid buffer path in test operation");
+            let buffer = project
+                .update(cx, |project, cx| {
+                    project.open_buffer((worktree_id, &path), cx)
+                })
+                .await?;
+            client.buffers_for_project(&project).insert(buffer);
+        }
+
+        ClientOperation::EditBuffer {
+            project_root_name,
+            full_path,
+            edits,
+        } => {
+            log::info!(
+                "{}: editing buffer {:?} in project {} with {:?}",
+                client.username,
+                full_path,
+                project_root_name,
+                edits
+            );
+            let project = project_for_root_name(client, &project_root_name, cx)
+                .expect("invalid project in test operation");
+            let buffer = client
+                .buffers_for_project(&project)
                 .iter()
-                .filter(|contact| contact.online && !contact.busy)
+                .find(|buffer| {
+                    buffer.read_with(cx, |buffer, cx| {
+                        buffer.file().unwrap().full_path(cx) == full_path
+                    })
+                })
                 .cloned()
-                .collect::<Vec<_>>()
-        });
+                .expect("invalid buffer path in test operation");
+            buffer.update(cx, |buffer, cx| {
+                buffer.edit(edits, None, cx);
+            });
+        }
 
-        let distribution = plan.lock().rng.gen_range(0..100);
-        match distribution {
-            0..=29 if !available_contacts.is_empty() => {
-                let contact = available_contacts.choose(&mut plan.lock().rng).unwrap();
-                log::info!(
-                    "{}: inviting {}",
-                    client.username,
-                    contact.user.github_login
-                );
-                active_call
-                    .update(cx, |call, cx| call.invite(contact.user.id, None, cx))
-                    .await?;
-            }
-            30..=39
-                if plan.lock().allow_client_disconnection
-                    && active_call.read_with(cx, |call, _| call.room().is_some()) =>
-            {
-                log::info!("{}: hanging up", client.username);
-                active_call.update(cx, |call, cx| call.hang_up(cx))?;
+        _ => {
+            let choice = plan.lock().rng.gen_range(0..100);
+            match choice {
+                50..=59
+                    if !client.local_projects().is_empty()
+                        || !client.remote_projects().is_empty() =>
+                {
+                    randomly_mutate_worktrees(client, &plan, cx).await?;
+                }
+                60..=84
+                    if !client.local_projects().is_empty()
+                        || !client.remote_projects().is_empty() =>
+                {
+                    randomly_query_and_mutate_buffers(client, &plan, cx).await?;
+                }
+                _ => randomly_mutate_fs(client, &plan).await,
             }
-            _ => {}
         }
     }
-
     Ok(())
 }
 
+fn project_for_root_name(
+    client: &TestClient,
+    root_name: &str,
+    cx: &TestAppContext,
+) -> Option<ModelHandle<Project>> {
+    if let Some(ix) = project_ix_for_root_name(&*client.local_projects(), root_name, cx) {
+        return Some(client.local_projects()[ix].clone());
+    }
+    if let Some(ix) = project_ix_for_root_name(&*client.remote_projects(), root_name, cx) {
+        return Some(client.remote_projects()[ix].clone());
+    }
+    None
+}
+
+fn project_ix_for_root_name(
+    projects: &[ModelHandle<Project>],
+    root_name: &str,
+    cx: &TestAppContext,
+) -> Option<usize> {
+    projects.iter().position(|project| {
+        project.read_with(cx, |project, cx| {
+            let worktree = project.visible_worktrees(cx).next().unwrap();
+            worktree.read(cx).root_name() == root_name
+        })
+    })
+}
+
+fn root_name_for_project(project: &ModelHandle<Project>, cx: &TestAppContext) -> String {
+    project.read_with(cx, |project, cx| {
+        project
+            .visible_worktrees(cx)
+            .next()
+            .unwrap()
+            .read(cx)
+            .root_name()
+            .to_string()
+    })
+}
+
 async fn randomly_mutate_fs(client: &TestClient, plan: &Arc<Mutex<TestPlan>>) {
     let is_dir = plan.lock().rng.gen::<bool>();
     let mut new_path = client

crates/text/src/text.rs 🔗

@@ -1429,12 +1429,11 @@ impl Buffer {
         start..end
     }
 
-    #[allow(clippy::type_complexity)]
-    pub fn randomly_edit<T>(
-        &mut self,
+    pub fn get_random_edits<T>(
+        &self,
         rng: &mut T,
         edit_count: usize,
-    ) -> (Vec<(Range<usize>, Arc<str>)>, Operation)
+    ) -> Vec<(Range<usize>, Arc<str>)>
     where
         T: rand::Rng,
     {
@@ -1453,8 +1452,21 @@ impl Buffer {
 
             edits.push((range, new_text.into()));
         }
+        edits
+    }
 
+    #[allow(clippy::type_complexity)]
+    pub fn randomly_edit<T>(
+        &mut self,
+        rng: &mut T,
+        edit_count: usize,
+    ) -> (Vec<(Range<usize>, Arc<str>)>, Operation)
+    where
+        T: rand::Rng,
+    {
+        let mut edits = self.get_random_edits(rng, edit_count);
         log::info!("mutating buffer {} with {:?}", self.replica_id, edits);
+
         let op = self.edit(edits.iter().cloned());
         if let Operation::Edit(edit) = &op {
             assert_eq!(edits.len(), edit.new_text.len());