Skip inapplicable operations when running an edited test plan

Max Brunsfeld created

Change summary

crates/collab/src/tests/randomized_integration_tests.rs | 633 ++++++----
1 file changed, 375 insertions(+), 258 deletions(-)

Detailed changes

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -7,9 +7,10 @@ use anyhow::{anyhow, Result};
 use call::ActiveCall;
 use client::RECEIVE_TIMEOUT;
 use collections::{BTreeMap, HashSet};
+use editor::Bias;
 use fs::{FakeFs, Fs as _};
 use futures::StreamExt as _;
-use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
+use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext};
 use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16};
 use lsp::FakeLanguageServer;
 use parking_lot::Mutex;
@@ -21,7 +22,10 @@ use std::{
     ops::Range,
     path::{Path, PathBuf},
     rc::Rc,
-    sync::Arc,
+    sync::{
+        atomic::{AtomicBool, Ordering::SeqCst},
+        Arc,
+    },
 };
 use util::ResultExt;
 
@@ -101,147 +105,21 @@ async fn test_random_collaboration(
     let mut next_entity_id = 100000;
 
     loop {
-        let Some(next_operation) = plan.lock().next_operation(&clients).await else { break };
-        match next_operation {
-            Operation::AddConnection { user_id } => {
-                let username = {
-                    let mut plan = plan.lock();
-                    let mut user = plan.user(user_id);
-                    user.online = true;
-                    user.username.clone()
-                };
-                log::info!("Adding new connection for {}", username);
-                next_entity_id += 100000;
-                let mut client_cx = TestAppContext::new(
-                    cx.foreground_platform(),
-                    cx.platform(),
-                    deterministic.build_foreground(next_entity_id),
-                    deterministic.build_background(),
-                    cx.font_cache(),
-                    cx.leak_detector(),
-                    next_entity_id,
-                    cx.function_name.clone(),
-                );
-
-                let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
-                let client = Rc::new(server.create_client(&mut client_cx, &username).await);
-                operation_channels.push(operation_tx);
-                clients.push((client.clone(), client_cx.clone()));
-                client_tasks.push(client_cx.foreground().spawn(simulate_client(
-                    client,
-                    operation_rx,
-                    plan.clone(),
-                    client_cx,
-                )));
-
-                log::info!("Added connection for {}", username);
-            }
-
-            Operation::RemoveConnection { user_id } => {
-                log::info!("Simulating full disconnection of user {}", user_id);
-                let client_ix = clients
-                    .iter()
-                    .position(|(client, cx)| client.current_user_id(cx) == user_id)
-                    .unwrap();
-                let user_connection_ids = server
-                    .connection_pool
-                    .lock()
-                    .user_connection_ids(user_id)
-                    .collect::<Vec<_>>();
-                assert_eq!(user_connection_ids.len(), 1);
-                let removed_peer_id = user_connection_ids[0].into();
-                let (client, mut client_cx) = clients.remove(client_ix);
-                let client_task = client_tasks.remove(client_ix);
-                operation_channels.remove(client_ix);
-                server.forbid_connections();
-                server.disconnect_client(removed_peer_id);
-                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
-                deterministic.start_waiting();
-                log::info!("Waiting for user {} to exit...", user_id);
-                client_task.await;
-                deterministic.finish_waiting();
-                server.allow_connections();
-
-                for project in client.remote_projects().iter() {
-                    project.read_with(&client_cx, |project, _| {
-                        assert!(
-                            project.is_read_only(),
-                            "project {:?} should be read only",
-                            project.remote_id()
-                        )
-                    });
-                }
-
-                for (client, cx) in &clients {
-                    let contacts = server
-                        .app_state
-                        .db
-                        .get_contacts(client.current_user_id(cx))
-                        .await
-                        .unwrap();
-                    let pool = server.connection_pool.lock();
-                    for contact in contacts {
-                        if let db::Contact::Accepted { user_id: id, .. } = contact {
-                            if pool.is_user_online(id) {
-                                assert_ne!(
-                                    id, user_id,
-                                    "removed client is still a contact of another peer"
-                                );
-                            }
-                        }
-                    }
-                }
-
-                log::info!("{} removed", client.username);
-                plan.lock().user(user_id).online = false;
-                client_cx.update(|cx| {
-                    cx.clear_globals();
-                    drop(client);
-                });
-            }
-
-            Operation::BounceConnection { user_id } => {
-                log::info!("Simulating temporary disconnection of user {}", user_id);
-                let user_connection_ids = server
-                    .connection_pool
-                    .lock()
-                    .user_connection_ids(user_id)
-                    .collect::<Vec<_>>();
-                assert_eq!(user_connection_ids.len(), 1);
-                let peer_id = user_connection_ids[0].into();
-                server.disconnect_client(peer_id);
-                deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
-            }
-
-            Operation::RestartServer => {
-                log::info!("Simulating server restart");
-                server.reset().await;
-                deterministic.advance_clock(RECEIVE_TIMEOUT);
-                server.start().await.unwrap();
-                deterministic.advance_clock(CLEANUP_TIMEOUT);
-                let environment = &server.app_state.config.zed_environment;
-                let stale_room_ids = server
-                    .app_state
-                    .db
-                    .stale_room_ids(environment, server.id())
-                    .await
-                    .unwrap();
-                assert_eq!(stale_room_ids, vec![]);
-            }
-
-            Operation::MutateClients { user_ids, quiesce } => {
-                for user_id in user_ids {
-                    let client_ix = clients
-                        .iter()
-                        .position(|(client, cx)| client.current_user_id(cx) == user_id)
-                        .unwrap();
-                    operation_channels[client_ix].unbounded_send(()).unwrap();
-                }
-
-                if quiesce {
-                    deterministic.run_until_parked();
-                }
-            }
+        let Some((next_operation, skipped)) = plan.lock().next_server_operation(&clients) else { break };
+        let applied = apply_server_operation(
+            deterministic.clone(),
+            &mut server,
+            &mut clients,
+            &mut client_tasks,
+            &mut operation_channels,
+            &mut next_entity_id,
+            plan.clone(),
+            next_operation,
+            cx,
+        )
+        .await;
+        if !applied {
+            skipped.store(false, SeqCst);
         }
     }
 
@@ -430,39 +308,216 @@ async fn test_random_collaboration(
     }
 }
 
+async fn apply_server_operation(
+    deterministic: Arc<Deterministic>,
+    server: &mut TestServer,
+    clients: &mut Vec<(Rc<TestClient>, TestAppContext)>,
+    client_tasks: &mut Vec<Task<()>>,
+    operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<()>>,
+    next_entity_id: &mut usize,
+    plan: Arc<Mutex<TestPlan>>,
+    operation: Operation,
+    cx: &mut TestAppContext,
+) -> bool {
+    match operation {
+        Operation::AddConnection { user_id } => {
+            let username;
+            {
+                let mut plan = plan.lock();
+                let mut user = plan.user(user_id);
+                if user.online {
+                    return false;
+                }
+                user.online = true;
+                username = user.username.clone();
+            };
+            log::info!("Adding new connection for {}", username);
+            *next_entity_id += 100000;
+            let mut client_cx = TestAppContext::new(
+                cx.foreground_platform(),
+                cx.platform(),
+                deterministic.build_foreground(*next_entity_id),
+                deterministic.build_background(),
+                cx.font_cache(),
+                cx.leak_detector(),
+                *next_entity_id,
+                cx.function_name.clone(),
+            );
+
+            let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
+            let client = Rc::new(server.create_client(&mut client_cx, &username).await);
+            operation_channels.push(operation_tx);
+            clients.push((client.clone(), client_cx.clone()));
+            client_tasks.push(client_cx.foreground().spawn(simulate_client(
+                client,
+                operation_rx,
+                plan.clone(),
+                client_cx,
+            )));
+
+            log::info!("Added connection for {}", username);
+        }
+
+        Operation::RemoveConnection { user_id } => {
+            log::info!("Simulating full disconnection of user {}", user_id);
+            let client_ix = clients
+                .iter()
+                .position(|(client, cx)| client.current_user_id(cx) == user_id);
+            let Some(client_ix) = client_ix else { return false };
+            let user_connection_ids = server
+                .connection_pool
+                .lock()
+                .user_connection_ids(user_id)
+                .collect::<Vec<_>>();
+            assert_eq!(user_connection_ids.len(), 1);
+            let removed_peer_id = user_connection_ids[0].into();
+            let (client, mut client_cx) = clients.remove(client_ix);
+            let client_task = client_tasks.remove(client_ix);
+            operation_channels.remove(client_ix);
+            server.forbid_connections();
+            server.disconnect_client(removed_peer_id);
+            deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+            deterministic.start_waiting();
+            log::info!("Waiting for user {} to exit...", user_id);
+            client_task.await;
+            deterministic.finish_waiting();
+            server.allow_connections();
+
+            for project in client.remote_projects().iter() {
+                project.read_with(&client_cx, |project, _| {
+                    assert!(
+                        project.is_read_only(),
+                        "project {:?} should be read only",
+                        project.remote_id()
+                    )
+                });
+            }
+
+            for (client, cx) in clients {
+                let contacts = server
+                    .app_state
+                    .db
+                    .get_contacts(client.current_user_id(cx))
+                    .await
+                    .unwrap();
+                let pool = server.connection_pool.lock();
+                for contact in contacts {
+                    if let db::Contact::Accepted { user_id: id, .. } = contact {
+                        if pool.is_user_online(id) {
+                            assert_ne!(
+                                id, user_id,
+                                "removed client is still a contact of another peer"
+                            );
+                        }
+                    }
+                }
+            }
+
+            log::info!("{} removed", client.username);
+            plan.lock().user(user_id).online = false;
+            client_cx.update(|cx| {
+                cx.clear_globals();
+                drop(client);
+            });
+        }
+
+        Operation::BounceConnection { user_id } => {
+            log::info!("Simulating temporary disconnection of user {}", user_id);
+            let user_connection_ids = server
+                .connection_pool
+                .lock()
+                .user_connection_ids(user_id)
+                .collect::<Vec<_>>();
+            if user_connection_ids.is_empty() {
+                return false;
+            }
+            assert_eq!(user_connection_ids.len(), 1);
+            let peer_id = user_connection_ids[0].into();
+            server.disconnect_client(peer_id);
+            deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+        }
+
+        Operation::RestartServer => {
+            log::info!("Simulating server restart");
+            server.reset().await;
+            deterministic.advance_clock(RECEIVE_TIMEOUT);
+            server.start().await.unwrap();
+            deterministic.advance_clock(CLEANUP_TIMEOUT);
+            let environment = &server.app_state.config.zed_environment;
+            let stale_room_ids = server
+                .app_state
+                .db
+                .stale_room_ids(environment, server.id())
+                .await
+                .unwrap();
+            assert_eq!(stale_room_ids, vec![]);
+        }
+
+        Operation::MutateClients { user_ids, quiesce } => {
+            let mut applied = false;
+            for user_id in user_ids {
+                let client_ix = clients
+                    .iter()
+                    .position(|(client, cx)| client.current_user_id(cx) == user_id);
+                let Some(client_ix) = client_ix else { continue };
+                applied = true;
+                if let Err(err) = operation_channels[client_ix].unbounded_send(()) {
+                    // panic!("error signaling user {}, client {}", user_id, client_ix);
+                }
+            }
+
+            if quiesce && applied {
+                deterministic.run_until_parked();
+            }
+
+            return applied;
+        }
+    }
+    true
+}
+
 async fn apply_client_operation(
     client: &TestClient,
     operation: ClientOperation,
     cx: &mut TestAppContext,
-) -> Result<()> {
+) -> Result<bool> {
     match operation {
         ClientOperation::AcceptIncomingCall => {
-            log::info!("{}: accepting incoming call", client.username);
-
             let active_call = cx.read(ActiveCall::global);
+            if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) {
+                return Ok(false);
+            }
+
+            log::info!("{}: accepting incoming call", client.username);
             active_call
                 .update(cx, |call, cx| call.accept_incoming(cx))
                 .await?;
         }
 
         ClientOperation::RejectIncomingCall => {
-            log::info!("{}: declining incoming call", client.username);
-
             let active_call = cx.read(ActiveCall::global);
+            if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) {
+                return Ok(false);
+            }
+
+            log::info!("{}: declining incoming call", client.username);
             active_call.update(cx, |call, _| call.decline_incoming())?;
         }
 
         ClientOperation::LeaveCall => {
-            log::info!("{}: hanging up", client.username);
-
             let active_call = cx.read(ActiveCall::global);
+            if active_call.read_with(cx, |call, _| call.room().is_none()) {
+                return Ok(false);
+            }
+
+            log::info!("{}: hanging up", client.username);
             active_call.update(cx, |call, cx| call.hang_up(cx))?;
         }
 
         ClientOperation::InviteContactToCall { user_id } => {
-            log::info!("{}: inviting {}", client.username, user_id,);
-
             let active_call = cx.read(ActiveCall::global);
+
+            log::info!("{}: inviting {}", client.username, user_id,);
             active_call
                 .update(cx, |call, cx| call.invite(user_id.to_proto(), None, cx))
                 .await
@@ -492,6 +547,10 @@ async fn apply_client_operation(
             project_root_name,
             new_root_path,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false)
+            };
+
             log::info!(
                 "{}: finding/creating local worktree at {:?} to project with root path {}",
                 client.username,
@@ -499,8 +558,6 @@ async fn apply_client_operation(
                 project_root_name
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
             if !client.fs.paths().await.contains(&new_root_path) {
                 client.fs.create_dir(&new_root_path).await.unwrap();
@@ -514,21 +571,56 @@ async fn apply_client_operation(
         }
 
         ClientOperation::CloseRemoteProject { project_root_name } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false)
+            };
+
             log::info!(
                 "{}: closing remote project with root path {}",
                 client.username,
                 project_root_name,
             );
 
-            let ix = project_ix_for_root_name(&*client.remote_projects(), &project_root_name, cx)
-                .expect("invalid project in test operation");
-            cx.update(|_| client.remote_projects_mut().remove(ix));
+            let ix = client
+                .remote_projects()
+                .iter()
+                .position(|p| p == &project)
+                .unwrap();
+            cx.update(|_| {
+                client.remote_projects_mut().remove(ix);
+                drop(project);
+            });
         }
 
         ClientOperation::OpenRemoteProject {
             host_id,
             first_root_name,
         } => {
+            let active_call = cx.read(ActiveCall::global);
+            let project = active_call.update(cx, |call, cx| {
+                let room = call.room().cloned()?;
+                let participant = room
+                    .read(cx)
+                    .remote_participants()
+                    .get(&host_id.to_proto())?;
+                let project_id = participant
+                    .projects
+                    .iter()
+                    .find(|project| project.worktree_root_names[0] == first_root_name)?
+                    .id;
+                Some(room.update(cx, |room, cx| {
+                    room.join_project(
+                        project_id,
+                        client.language_registry.clone(),
+                        FakeFs::new(cx.background().clone()),
+                        cx,
+                    )
+                }))
+            });
+            let Some(project) = project else {
+                return Ok(false)
+            };
+
             log::info!(
                 "{}: joining remote project of user {}, root name {}",
                 client.username,
@@ -536,30 +628,7 @@ async fn apply_client_operation(
                 first_root_name,
             );
 
-            let active_call = cx.read(ActiveCall::global);
-            let project = active_call
-                .update(cx, |call, cx| {
-                    let room = call.room().cloned()?;
-                    let participant = room
-                        .read(cx)
-                        .remote_participants()
-                        .get(&host_id.to_proto())?;
-                    let project_id = participant
-                        .projects
-                        .iter()
-                        .find(|project| project.worktree_root_names[0] == first_root_name)?
-                        .id;
-                    Some(room.update(cx, |room, cx| {
-                        room.join_project(
-                            project_id,
-                            client.language_registry.clone(),
-                            FakeFs::new(cx.background().clone()),
-                            cx,
-                        )
-                    }))
-                })
-                .expect("invalid project in test operation")
-                .await?;
+            let project = project.await?;
             client.remote_projects_mut().push(project.clone());
         }
 
@@ -569,6 +638,13 @@ async fn apply_client_operation(
             full_path,
             is_dir,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(project_path) = project_path_for_full_path(&project, &full_path, cx) else {
+                return Ok(false);
+            };
+
             log::info!(
                 "{}: creating {} at path {:?} in {} project {}",
                 client.username,
@@ -578,11 +654,7 @@ async fn apply_client_operation(
                 project_root_name,
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
-            let project_path = project_path_for_full_path(&project, &full_path, cx)
-                .expect("invalid worktree path in test operation");
             project
                 .update(cx, |p, cx| p.create_entry(project_path, is_dir, cx))
                 .unwrap()
@@ -594,6 +666,13 @@ async fn apply_client_operation(
             is_local,
             full_path,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(project_path) = project_path_for_full_path(&project, &full_path, cx) else {
+                return Ok(false);
+            };
+
             log::info!(
                 "{}: opening buffer {:?} in {} project {}",
                 client.username,
@@ -602,11 +681,7 @@ async fn apply_client_operation(
                 project_root_name,
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
-            let project_path = project_path_for_full_path(&project, &full_path, cx)
-                .expect("invalid buffer path in test operation");
             let buffer = project
                 .update(cx, |project, cx| project.open_buffer(project_path, cx))
                 .await?;
@@ -619,6 +694,14 @@ async fn apply_client_operation(
             full_path,
             edits,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(buffer) =
+                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else {
+                    return Ok(false);
+                };
+
             log::info!(
                 "{}: editing buffer {:?} in {} project {} with {:?}",
                 client.username,
@@ -628,14 +711,18 @@ async fn apply_client_operation(
                 edits
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
-            let buffer =
-                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx)
-                    .expect("invalid buffer path in test operation");
             buffer.update(cx, |buffer, cx| {
-                buffer.edit(edits, None, cx);
+                let snapshot = buffer.snapshot();
+                buffer.edit(
+                    edits.into_iter().map(|(range, text)| {
+                        let start = snapshot.clip_offset(range.start, Bias::Left);
+                        let end = snapshot.clip_offset(range.end, Bias::Right);
+                        (start..end, text)
+                    }),
+                    None,
+                    cx,
+                );
             });
         }
 
@@ -644,20 +731,23 @@ async fn apply_client_operation(
             is_local,
             full_path,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(buffer) =
+                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else {
+                    return Ok(false);
+                };
+
             log::info!(
-                "{}: dropping buffer {:?} in {} project {}",
+                "{}: closing buffer {:?} in {} project {}",
                 client.username,
                 full_path,
                 if is_local { "local" } else { "remote" },
                 project_root_name
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
-            let buffer =
-                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx)
-                    .expect("invalid buffer path in test operation");
             cx.update(|_| {
                 client.buffers_for_project(&project).remove(&buffer);
                 drop(buffer);
@@ -670,6 +760,14 @@ async fn apply_client_operation(
             full_path,
             detach,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(buffer) =
+                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else {
+                    return Ok(false);
+                };
+
             log::info!(
                 "{}: saving buffer {:?} in {} project {}{}",
                 client.username,
@@ -679,12 +777,7 @@ async fn apply_client_operation(
                 if detach { ", detaching" } else { ", awaiting" }
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
             ensure_project_shared(&project, client, cx).await;
-            let buffer =
-                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx)
-                    .expect("invalid buffer path in test operation");
             let (requested_version, save) =
                 buffer.update(cx, |buffer, cx| (buffer.version(), buffer.save(cx)));
             let save = cx.background().spawn(async move {
@@ -710,6 +803,14 @@ async fn apply_client_operation(
             kind,
             detach,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+            let Some(buffer) =
+                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else {
+                    return Ok(false);
+                };
+
             log::info!(
                 "{}: request LSP {:?} for buffer {:?} in {} project {}{}",
                 client.username,
@@ -720,11 +821,7 @@ async fn apply_client_operation(
                 if detach { ", detaching" } else { ", awaiting" }
             );
 
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
-            let buffer =
-                buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx)
-                    .expect("invalid buffer path in test operation");
+            let offset = buffer.read_with(cx, |b, _| b.clip_offset(offset, Bias::Left));
             let request = match kind {
                 LspRequestKind::Rename => cx.spawn(|mut cx| async move {
                     project
@@ -770,6 +867,10 @@ async fn apply_client_operation(
             query,
             detach,
         } => {
+            let Some(project) = project_for_root_name(client, &project_root_name, cx) else {
+                return Ok(false);
+            };
+
             log::info!(
                 "{}: search {} project {} for {:?}{}",
                 client.username,
@@ -778,8 +879,7 @@ async fn apply_client_operation(
                 query,
                 if detach { ", detaching" } else { ", awaiting" }
             );
-            let project = project_for_root_name(client, &project_root_name, cx)
-                .expect("invalid project in test operation");
+
             let search = project.update(cx, |project, cx| {
                 project.search(SearchQuery::text(query, false, false), cx)
             });
@@ -797,12 +897,17 @@ async fn apply_client_operation(
         }
 
         ClientOperation::CreateFsEntry { path, is_dir } => {
+            if client.fs.metadata(&path.parent().unwrap()).await?.is_none() {
+                return Ok(false);
+            }
+
             log::info!(
                 "{}: creating {} at {:?}",
                 client.username,
                 if is_dir { "dir" } else { "file" },
                 path
             );
+
             if is_dir {
                 client.fs.create_dir(&path).await.unwrap();
             } else {
@@ -814,13 +919,13 @@ async fn apply_client_operation(
             }
         }
     }
-    Ok(())
+    Ok(true)
 }
 
 struct TestPlan {
     rng: StdRng,
     replay: bool,
-    stored_operations: Vec<StoredOperation>,
+    stored_operations: Vec<(StoredOperation, Arc<AtomicBool>)>,
     max_operations: usize,
     operation_ix: usize,
     users: Vec<UserTestPlan>,
@@ -962,51 +1067,57 @@ impl TestPlan {
     fn load(&mut self, path: &Path) {
         let json = std::fs::read_to_string(path).unwrap();
         self.replay = true;
-        self.stored_operations = serde_json::from_str(&json).unwrap();
+        let stored_operations: Vec<StoredOperation> = serde_json::from_str(&json).unwrap();
+        self.stored_operations = stored_operations
+            .into_iter()
+            .map(|operation| (operation, Arc::new(AtomicBool::new(false))))
+            .collect()
     }
 
     fn save(&mut self, path: &Path) {
         // Format each operation as one line
         let mut json = Vec::new();
         json.push(b'[');
-        for (i, stored_operation) in self.stored_operations.iter().enumerate() {
-            if i > 0 {
+        for (operation, skipped) in &self.stored_operations {
+            if skipped.load(SeqCst) {
+                continue;
+            }
+            if json.len() > 1 {
                 json.push(b',');
             }
             json.extend_from_slice(b"\n  ");
-            serde_json::to_writer(&mut json, stored_operation).unwrap();
+            serde_json::to_writer(&mut json, operation).unwrap();
         }
         json.extend_from_slice(b"\n]\n");
         std::fs::write(path, &json).unwrap();
     }
 
-    async fn next_operation(
+    fn next_server_operation(
         &mut self,
         clients: &[(Rc<TestClient>, TestAppContext)],
-    ) -> Option<Operation> {
+    ) -> Option<(Operation, Arc<AtomicBool>)> {
         if self.replay {
             while let Some(stored_operation) = self.stored_operations.get(self.operation_ix) {
                 self.operation_ix += 1;
-                if let StoredOperation::Server(operation) = stored_operation {
-                    return Some(operation.clone());
+                if let (StoredOperation::Server(operation), skipped) = stored_operation {
+                    return Some((operation.clone(), skipped.clone()));
                 }
             }
             None
         } else {
-            let operation = self.generate_operation(clients).await;
-            if let Some(operation) = &operation {
-                self.stored_operations
-                    .push(StoredOperation::Server(operation.clone()))
-            }
-            operation
+            let operation = self.generate_server_operation(clients)?;
+            let skipped = Arc::new(AtomicBool::new(false));
+            self.stored_operations
+                .push((StoredOperation::Server(operation.clone()), skipped.clone()));
+            Some((operation, skipped))
         }
     }
 
-    async fn next_client_operation(
+    fn next_client_operation(
         &mut self,
         client: &TestClient,
         cx: &TestAppContext,
-    ) -> Option<ClientOperation> {
+    ) -> Option<(ClientOperation, Arc<AtomicBool>)> {
         let current_user_id = client.current_user_id(cx);
         let user_ix = self
             .users
@@ -1018,28 +1129,29 @@ impl TestPlan {
         if self.replay {
             while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) {
                 user_plan.operation_ix += 1;
-                if let StoredOperation::Client { user_id, operation } = stored_operation {
+                if let (StoredOperation::Client { user_id, operation }, skipped) = stored_operation
+                {
                     if user_id == &current_user_id {
-                        return Some(operation.clone());
+                        return Some((operation.clone(), skipped.clone()));
                     }
                 }
             }
             None
         } else {
-            let operation = self
-                .generate_client_operation(current_user_id, client, cx)
-                .await;
-            if let Some(operation) = &operation {
-                self.stored_operations.push(StoredOperation::Client {
+            let operation = self.generate_client_operation(current_user_id, client, cx)?;
+            let skipped = Arc::new(AtomicBool::new(false));
+            self.stored_operations.push((
+                StoredOperation::Client {
                     user_id: current_user_id,
                     operation: operation.clone(),
-                })
-            }
-            operation
+                },
+                skipped.clone(),
+            ));
+            Some((operation, skipped))
         }
     }
 
-    async fn generate_operation(
+    fn generate_server_operation(
         &mut self,
         clients: &[(Rc<TestClient>, TestAppContext)],
     ) -> Option<Operation> {
@@ -1091,7 +1203,7 @@ impl TestPlan {
                         .collect();
                     Operation::MutateClients {
                         user_ids,
-                        quiesce: self.rng.gen(),
+                        quiesce: self.rng.gen_bool(0.7),
                     }
                 }
                 _ => continue,
@@ -1099,7 +1211,7 @@ impl TestPlan {
         })
     }
 
-    async fn generate_client_operation(
+    fn generate_client_operation(
         &mut self,
         user_id: UserId,
         client: &TestClient,
@@ -1221,11 +1333,11 @@ impl TestPlan {
                         // Add a worktree to a local project
                         0..=50 => {
                             let Some(project) = client
-                                    .local_projects()
-                                    .choose(&mut self.rng)
-                                    .cloned() else { continue };
+                                .local_projects()
+                                .choose(&mut self.rng)
+                                .cloned() else { continue };
                             let project_root_name = root_name_for_project(&project, cx);
-                            let mut paths = client.fs.paths().await;
+                            let mut paths = cx.background().block(client.fs.paths());
                             paths.remove(0);
                             let new_root_path = if paths.is_empty() || self.rng.gen() {
                                 Path::new("/").join(&self.next_root_dir_name(user_id))
@@ -1396,10 +1508,9 @@ impl TestPlan {
                 // Create a file or directory
                 96.. => {
                     let is_dir = self.rng.gen::<bool>();
-                    let mut path = client
-                        .fs
-                        .directories()
-                        .await
+                    let mut path = cx
+                        .background()
+                        .block(client.fs.directories())
                         .choose(&mut self.rng)
                         .unwrap()
                         .clone();
@@ -1501,10 +1612,9 @@ async fn simulate_client(
                             let plan = plan.clone();
                             async move {
                                 let files = fs.files().await;
-                                let mut plan = plan.lock();
-                                let count = plan.rng.gen_range::<usize, _>(1..3);
+                                let count = plan.lock().rng.gen_range::<usize, _>(1..3);
                                 let files = (0..count)
-                                    .map(|_| files.choose(&mut plan.rng).unwrap())
+                                    .map(|_| files.choose(&mut plan.lock().rng).unwrap())
                                     .collect::<Vec<_>>();
                                 log::info!("LSP: Returning definitions in files {:?}", &files);
                                 Ok(Some(lsp::GotoDefinitionResponse::Array(
@@ -1552,9 +1662,16 @@ async fn simulate_client(
     client.language_registry.add(Arc::new(language));
 
     while operation_rx.next().await.is_some() {
-        let Some(operation) = plan.lock().next_client_operation(&client, &cx).await else { break };
-        if let Err(error) = apply_client_operation(&client, operation, &mut cx).await {
-            log::error!("{} error: {}", client.username, error);
+        let Some((operation, skipped)) = plan.lock().next_client_operation(&client, &cx) else { break };
+        match apply_client_operation(&client, operation, &mut cx).await {
+            Err(error) => {
+                log::error!("{} error: {}", client.username, error);
+            }
+            Ok(applied) => {
+                if !applied {
+                    skipped.store(true, SeqCst);
+                }
+            }
         }
         cx.background().simulate_random_delay().await;
     }