Merge branch 'main' into namespace-actions

Max Brunsfeld created

Change summary

Cargo.lock                     |   3 
crates/client/src/test.rs      |   6 
crates/project/src/project.rs  |  16 
crates/project/src/worktree.rs |  19 
crates/rpc/Cargo.toml          |  11 
crates/rpc/src/conn.rs         |  36 
crates/rpc/src/peer.rs         |  48 +-
crates/server/Cargo.toml       |   3 
crates/server/src/rpc.rs       | 765 ++++++++++++++++++++---------------
crates/server/src/rpc/store.rs |   7 
10 files changed, 516 insertions(+), 398 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -4007,11 +4007,11 @@ dependencies = [
  "async-tungstenite",
  "base64 0.13.0",
  "clock",
+ "collections",
  "futures",
  "gpui",
  "log",
  "parking_lot",
- "postage",
  "prost",
  "prost-build",
  "rand 0.8.3",
@@ -6122,7 +6122,6 @@ dependencies = [
  "oauth2",
  "oauth2-surf",
  "parking_lot",
- "postage",
  "project",
  "rand 0.8.3",
  "rpc",

crates/client/src/test.rs 🔗

@@ -6,7 +6,6 @@ use anyhow::{anyhow, Result};
 use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt};
 use gpui::{executor, ModelHandle, TestAppContext};
 use parking_lot::Mutex;
-use postage::barrier;
 use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope};
 use std::{fmt, rc::Rc, sync::Arc};
 
@@ -23,7 +22,6 @@ struct FakeServerState {
     connection_id: Option<ConnectionId>,
     forbid_connections: bool,
     auth_count: usize,
-    connection_killer: Option<barrier::Sender>,
     access_token: usize,
 }
 
@@ -76,15 +74,13 @@ impl FakeServer {
                             Err(EstablishConnectionError::Unauthorized)?
                         }
 
-                        let (client_conn, server_conn, kill) =
-                            Connection::in_memory(cx.background());
+                        let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
                         let (connection_id, io, incoming) =
                             peer.add_test_connection(server_conn, cx.background()).await;
                         cx.background().spawn(io).detach();
                         let mut state = state.lock();
                         state.connection_id = Some(connection_id);
                         state.incoming = Some(incoming);
-                        state.connection_killer = Some(kill);
                         Ok(client_conn)
                     })
                 }

crates/project/src/project.rs 🔗

@@ -1216,7 +1216,7 @@ impl Project {
                 let file = File::from_dyn(buffer.file())?;
                 let abs_path = file.as_local()?.abs_path(cx);
                 let uri = lsp::Url::from_file_path(abs_path).unwrap();
-                let buffer_snapshots = self.buffer_snapshots.entry(buffer.remote_id()).or_default();
+                let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?;
                 let (version, prev_snapshot) = buffer_snapshots.last()?;
                 let next_snapshot = buffer.text_snapshot();
                 let next_version = version + 1;
@@ -3850,7 +3850,7 @@ impl Project {
             let buffer = this
                 .opened_buffers
                 .get(&buffer_id)
-                .map(|buffer| buffer.upgrade(cx).unwrap())
+                .and_then(|buffer| buffer.upgrade(cx))
                 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
             Ok::<_, anyhow::Error>((project_id, buffer))
         })?;
@@ -3882,7 +3882,7 @@ impl Project {
                 buffers.insert(
                     this.opened_buffers
                         .get(buffer_id)
-                        .map(|buffer| buffer.upgrade(cx).unwrap())
+                        .and_then(|buffer| buffer.upgrade(cx))
                         .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
                 );
             }
@@ -3911,7 +3911,7 @@ impl Project {
                 buffers.insert(
                     this.opened_buffers
                         .get(buffer_id)
-                        .map(|buffer| buffer.upgrade(cx).unwrap())
+                        .and_then(|buffer| buffer.upgrade(cx))
                         .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
                 );
             }
@@ -3942,7 +3942,7 @@ impl Project {
         let buffer = this.read_with(&cx, |this, cx| {
             this.opened_buffers
                 .get(&envelope.payload.buffer_id)
-                .map(|buffer| buffer.upgrade(cx).unwrap())
+                .and_then(|buffer| buffer.upgrade(cx))
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
         })?;
         buffer
@@ -3972,7 +3972,7 @@ impl Project {
             let buffer = this
                 .opened_buffers
                 .get(&envelope.payload.buffer_id)
-                .map(|buffer| buffer.upgrade(cx).unwrap())
+                .and_then(|buffer| buffer.upgrade(cx))
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
             let language = buffer.read(cx).language();
             let completion = language::proto::deserialize_completion(
@@ -4014,7 +4014,7 @@ impl Project {
         let buffer = this.update(&mut cx, |this, cx| {
             this.opened_buffers
                 .get(&envelope.payload.buffer_id)
-                .map(|buffer| buffer.upgrade(cx).unwrap())
+                .and_then(|buffer| buffer.upgrade(cx))
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
         })?;
         buffer
@@ -4055,7 +4055,7 @@ impl Project {
             let buffer = this
                 .opened_buffers
                 .get(&envelope.payload.buffer_id)
-                .map(|buffer| buffer.upgrade(cx).unwrap())
+                .and_then(|buffer| buffer.upgrade(cx))
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
             Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
         })?;

crates/project/src/worktree.rs 🔗

@@ -11,7 +11,10 @@ use client::{proto, Client, TypedEnvelope};
 use clock::ReplicaId;
 use collections::HashMap;
 use futures::{
-    channel::mpsc::{self, UnboundedSender},
+    channel::{
+        mpsc::{self, UnboundedSender},
+        oneshot,
+    },
     Stream, StreamExt,
 };
 use fuzzy::CharBag;
@@ -26,7 +29,6 @@ use language::{
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use postage::{
-    oneshot,
     prelude::{Sink as _, Stream as _},
     watch,
 };
@@ -727,11 +729,11 @@ impl LocalWorktree {
 
     pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
         let register = self.register(project_id, cx);
-        let (mut share_tx, mut share_rx) = oneshot::channel();
+        let (share_tx, share_rx) = oneshot::channel();
         let (snapshots_to_send_tx, snapshots_to_send_rx) =
             smol::channel::unbounded::<LocalSnapshot>();
         if self.share.is_some() {
-            let _ = share_tx.try_send(Ok(()));
+            let _ = share_tx.send(Ok(()));
         } else {
             let rpc = self.client.clone();
             let worktree_id = cx.model_id() as u64;
@@ -756,15 +758,15 @@ impl LocalWorktree {
                                 })
                                 .await
                             {
-                                let _ = share_tx.try_send(Err(error));
+                                let _ = share_tx.send(Err(error));
                                 return Err(anyhow!("failed to send initial update worktree"));
                             } else {
-                                let _ = share_tx.try_send(Ok(()));
+                                let _ = share_tx.send(Ok(()));
                                 snapshot
                             }
                         }
                         Err(error) => {
-                            let _ = share_tx.try_send(Err(error.into()));
+                            let _ = share_tx.send(Err(error.into()));
                             return Err(anyhow!("failed to send initial update worktree"));
                         }
                     };
@@ -804,9 +806,8 @@ impl LocalWorktree {
                 });
             }
             share_rx
-                .next()
                 .await
-                .unwrap_or_else(|| Err(anyhow!("share ended")))
+                .unwrap_or_else(|_| Err(anyhow!("share ended")))
         })
     }
 

crates/rpc/Cargo.toml 🔗

@@ -9,9 +9,13 @@ path = "src/rpc.rs"
 doctest = false
 
 [features]
-test-support = ["gpui/test-support"]
+test-support = ["collections/test-support", "gpui/test-support"]
 
 [dependencies]
+clock = { path = "../clock" }
+collections = { path = "../collections" }
+gpui = { path = "../gpui", optional = true }
+util = { path = "../util" }
 anyhow = "1.0"
 async-lock = "2.4"
 async-tungstenite = "0.16"
@@ -19,21 +23,18 @@ base64 = "0.13"
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.11.1"
-postage = { version = "0.4.1", features = ["futures-traits"] }
 prost = "0.8"
 rand = "0.8"
 rsa = "0.4"
 serde = { version = "1", features = ["derive"] }
 smol-timeout = "0.6"
 zstd = "0.9"
-clock = { path = "../clock" }
-gpui = { path = "../gpui", optional = true }
-util = { path = "../util" }
 
 [build-dependencies]
 prost-build = "0.8"
 
 [dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
 smol = "1.2.5"
 tempdir = "0.3.7"

crates/rpc/src/conn.rs 🔗

@@ -35,21 +35,24 @@ impl Connection {
     #[cfg(any(test, feature = "test-support"))]
     pub fn in_memory(
         executor: std::sync::Arc<gpui::executor::Background>,
-    ) -> (Self, Self, postage::barrier::Sender) {
-        use postage::prelude::Stream;
+    ) -> (Self, Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
+        use std::sync::{
+            atomic::{AtomicBool, Ordering::SeqCst},
+            Arc,
+        };
 
-        let (kill_tx, kill_rx) = postage::barrier::channel();
-        let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone());
-        let (b_tx, b_rx) = channel(kill_rx, executor);
+        let killed = Arc::new(AtomicBool::new(false));
+        let (a_tx, a_rx) = channel(killed.clone(), executor.clone());
+        let (b_tx, b_rx) = channel(killed.clone(), executor);
         return (
             Self { tx: a_tx, rx: b_rx },
             Self { tx: b_tx, rx: a_rx },
-            kill_tx,
+            killed,
         );
 
         fn channel(
-            kill_rx: postage::barrier::Receiver,
-            executor: std::sync::Arc<gpui::executor::Background>,
+            killed: Arc<AtomicBool>,
+            executor: Arc<gpui::executor::Background>,
         ) -> (
             Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
             Box<
@@ -57,20 +60,17 @@ impl Connection {
             >,
         ) {
             use futures::channel::mpsc;
-            use std::{
-                io::{Error, ErrorKind},
-                sync::Arc,
-            };
+            use std::io::{Error, ErrorKind};
 
             let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
 
             let tx = tx
                 .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
                 .with({
-                    let kill_rx = kill_rx.clone();
+                    let killed = killed.clone();
                     let executor = Arc::downgrade(&executor);
                     move |msg| {
-                        let mut kill_rx = kill_rx.clone();
+                        let killed = killed.clone();
                         let executor = executor.clone();
                         Box::pin(async move {
                             if let Some(executor) = executor.upgrade() {
@@ -78,7 +78,7 @@ impl Connection {
                             }
 
                             // Writes to a half-open TCP connection will error.
-                            if kill_rx.try_recv().is_ok() {
+                            if killed.load(SeqCst) {
                                 std::io::Result::Err(
                                     Error::new(ErrorKind::Other, "connection lost").into(),
                                 )?;
@@ -90,10 +90,10 @@ impl Connection {
                 });
 
             let rx = rx.then({
-                let kill_rx = kill_rx.clone();
+                let killed = killed.clone();
                 let executor = Arc::downgrade(&executor);
                 move |msg| {
-                    let mut kill_rx = kill_rx.clone();
+                    let killed = killed.clone();
                     let executor = executor.clone();
                     Box::pin(async move {
                         if let Some(executor) = executor.upgrade() {
@@ -101,7 +101,7 @@ impl Connection {
                         }
 
                         // Reads from a half-open TCP connection will hang.
-                        if kill_rx.try_recv().is_ok() {
+                        if killed.load(SeqCst) {
                             futures::future::pending::<()>().await;
                         }
 

crates/rpc/src/peer.rs 🔗

@@ -1,16 +1,18 @@
-use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage};
-use super::Connection;
+use super::{
+    proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage},
+    Connection,
+};
 use anyhow::{anyhow, Context, Result};
-use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt};
-use parking_lot::{Mutex, RwLock};
-use postage::{
-    barrier, mpsc,
-    prelude::{Sink as _, Stream as _},
+use collections::HashMap;
+use futures::{
+    channel::{mpsc, oneshot},
+    stream::BoxStream,
+    FutureExt, SinkExt, StreamExt,
 };
-use smol_timeout::TimeoutExt as _;
+use parking_lot::{Mutex, RwLock};
+use smol_timeout::TimeoutExt;
 use std::sync::atomic::Ordering::SeqCst;
 use std::{
-    collections::HashMap,
     fmt,
     future::Future,
     marker::PhantomData,
@@ -88,10 +90,10 @@ pub struct Peer {
 
 #[derive(Clone)]
 pub struct ConnectionState {
-    outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Message>,
+    outgoing_tx: mpsc::UnboundedSender<proto::Message>,
     next_message_id: Arc<AtomicU32>,
     response_channels:
-        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, barrier::Sender)>>>>>,
+        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, oneshot::Sender<()>)>>>>>,
 }
 
 const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
@@ -125,7 +127,7 @@ impl Peer {
         // bounded channel so that other peers will receive backpressure if they send
         // messages faster than this peer can process them.
         let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
-        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
+        let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
 
         let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
         let connection_state = ConnectionState {
@@ -173,8 +175,10 @@ impl Peer {
                             let incoming = incoming.context("received invalid RPC message")?;
                             receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
                             if let proto::Message::Envelope(incoming) = incoming {
-                                if incoming_tx.send(incoming).await.is_err() {
-                                    return Ok(());
+                                match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
+                                    Some(Ok(_)) => {},
+                                    Some(Err(_)) => return Ok(()),
+                                    None => Err(anyhow!("timed out processing incoming message"))?,
                                 }
                             }
                             break;
@@ -206,14 +210,14 @@ impl Peer {
                 if let Some(responding_to) = incoming.responding_to {
                     let channel = response_channels.lock().as_mut()?.remove(&responding_to);
                     if let Some(tx) = channel {
-                        let mut requester_resumed = barrier::channel();
+                        let requester_resumed = oneshot::channel();
                         if let Err(error) = tx.send((incoming, requester_resumed.0)) {
                             log::debug!(
                                 "received RPC but request future was dropped {:?}",
                                 error.0
                             );
                         }
-                        requester_resumed.1.recv().await;
+                        let _ = requester_resumed.1.await;
                     } else {
                         log::warn!("received RPC response to unknown request {}", responding_to);
                     }
@@ -719,26 +723,26 @@ mod tests {
             .add_test_connection(client_conn, cx.background())
             .await;
 
-        let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel();
+        let (io_ended_tx, io_ended_rx) = oneshot::channel();
         executor
             .spawn(async move {
                 io_handler.await.ok();
-                io_ended_tx.send(()).await.unwrap();
+                io_ended_tx.send(()).unwrap();
             })
             .detach();
 
-        let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel();
+        let (messages_ended_tx, messages_ended_rx) = oneshot::channel();
         executor
             .spawn(async move {
                 incoming.next().await;
-                messages_ended_tx.send(()).await.unwrap();
+                messages_ended_tx.send(()).unwrap();
             })
             .detach();
 
         client.disconnect(connection_id);
 
-        io_ended_rx.recv().await;
-        messages_ended_rx.recv().await;
+        let _ = io_ended_rx.await;
+        let _ = messages_ended_rx.await;
         assert!(server_conn
             .send(WebSocketMessage::Binary(vec![]))
             .await

crates/server/Cargo.toml 🔗

@@ -14,7 +14,6 @@ required-features = ["seed-support"]
 
 [dependencies]
 collections = { path = "../collections" }
-settings = { path = "../settings" }
 rpc = { path = "../rpc" }
 anyhow = "1.0.40"
 async-io = "1.3"
@@ -34,7 +33,6 @@ lipsum = { version = "0.8", optional = true }
 oauth2 = { version = "4.0.0", default_features = false }
 oauth2-surf = "0.1.1"
 parking_lot = "0.11.1"
-postage = { version = "0.4.1", features = ["futures-traits"] }
 rand = "0.8"
 rust-embed = { version = "6.3", features = ["include-exclude"] }
 scrypt = "0.7"
@@ -65,6 +63,7 @@ editor = { path = "../editor", features = ["test-support"] }
 language = { path = "../language", features = ["test-support"] }
 lsp = { path = "../lsp", features = ["test-support"] }
 project = { path = "../project", features = ["test-support"] }
+settings = { path = "../settings", features = ["test-support"] }
 workspace = { path = "../workspace", features = ["test-support"] }
 ctor = "0.1"
 env_logger = "0.8"

crates/server/src/rpc.rs 🔗

@@ -1080,7 +1080,7 @@ mod tests {
     use ::rpc::Peer;
     use client::{
         self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
-        EstablishConnectionError, UserStore,
+        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
     };
     use collections::BTreeMap;
     use editor::{
@@ -1094,7 +1094,6 @@ mod tests {
     };
     use lsp::{self, FakeLanguageServer};
     use parking_lot::Mutex;
-    use postage::barrier;
     use project::{
         fs::{FakeFs, Fs as _},
         search::SearchQuery,
@@ -1118,6 +1117,7 @@ mod tests {
         },
         time::Duration,
     };
+    use util::TryFutureExt;
     use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
 
     #[cfg(test)]
@@ -4995,6 +4995,7 @@ mod tests {
         let operations = Rc::new(Cell::new(0));
         let mut server = TestServer::start(cx.foreground(), cx.background()).await;
         let mut clients = Vec::new();
+        let mut user_ids = Vec::new();
         let files = Arc::new(Mutex::new(Vec::new()));
 
         let mut next_entity_id = 100000;
@@ -5162,6 +5163,8 @@ mod tests {
         });
         host_language_registry.add(Arc::new(language));
 
+        let host_disconnected = Rc::new(AtomicBool::new(false));
+        user_ids.push(host.current_user_id(&host_cx));
         clients.push(cx.foreground().spawn(host.simulate_host(
             host_project,
             files,
@@ -5203,16 +5206,49 @@ mod tests {
                 )
                 .await
                 .unwrap();
+                user_ids.push(guest.current_user_id(&guest_cx));
                 clients.push(cx.foreground().spawn(guest.simulate_guest(
                     guest_id,
                     guest_project,
                     operations.clone(),
                     max_operations,
                     rng.clone(),
+                    host_disconnected.clone(),
                     guest_cx,
                 )));
 
                 log::info!("Guest {} added", guest_id);
+            } else if rng.lock().gen_bool(0.05) {
+                host_disconnected.store(true, SeqCst);
+                server.disconnect_client(user_ids[0]);
+                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
+                let mut clients = futures::future::join_all(clients).await;
+                cx.foreground().run_until_parked();
+
+                let (host, mut host_cx) = clients.remove(0);
+                host.project
+                    .as_ref()
+                    .unwrap()
+                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
+                for (guest, mut guest_cx) in clients {
+                    let contacts = server
+                        .store
+                        .read()
+                        .contacts_for_user(guest.current_user_id(&guest_cx));
+                    assert!(!contacts
+                        .iter()
+                        .flat_map(|contact| &contact.projects)
+                        .any(|project| project.id == host_project_id));
+                    guest
+                        .project
+                        .as_ref()
+                        .unwrap()
+                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
+                    guest_cx.update(|_| drop(guest));
+                }
+                host_cx.update(|_| drop(host));
+
+                return;
             }
         }
 
@@ -5325,7 +5361,7 @@ mod tests {
         server: Arc<Server>,
         foreground: Rc<executor::Foreground>,
         notifications: mpsc::UnboundedReceiver<()>,
-        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
+        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
         forbid_connections: Arc<AtomicBool>,
         _test_db: TestDb,
     }
@@ -5393,9 +5429,9 @@ mod tests {
                                 "server is forbidding connections"
                             )))
                         } else {
-                            let (client_conn, server_conn, kill_conn) =
+                            let (client_conn, server_conn, killed) =
                                 Connection::in_memory(cx.background());
-                            connection_killers.lock().insert(user_id, kill_conn);
+                            connection_killers.lock().insert(user_id, killed);
                             cx.background()
                                 .spawn(server.handle_connection(
                                     server_conn,
@@ -5437,7 +5473,11 @@ mod tests {
         }
 
         fn disconnect_client(&self, user_id: UserId) {
-            self.connection_killers.lock().remove(&user_id);
+            self.connection_killers
+                .lock()
+                .remove(&user_id)
+                .unwrap()
+                .store(true, SeqCst);
         }
 
         fn forbid_connections(&self) {
@@ -5483,6 +5523,14 @@ mod tests {
         }
     }
 
+    impl Deref for TestServer {
+        type Target = Server;
+
+        fn deref(&self) -> &Self::Target {
+            &self.server
+        }
+    }
+
     impl Drop for TestServer {
         fn drop(&mut self) {
             self.peer.reset();
@@ -5604,117 +5652,138 @@ mod tests {
             rng: Arc<Mutex<StdRng>>,
             mut cx: TestAppContext,
         ) -> (Self, TestAppContext) {
-            let fs = project.read_with(&cx, |project, _| project.fs().clone());
-            while operations.get() < max_operations {
-                operations.set(operations.get() + 1);
+            async fn simulate_host_internal(
+                client: &mut TestClient,
+                project: ModelHandle<Project>,
+                files: Arc<Mutex<Vec<PathBuf>>>,
+                operations: Rc<Cell<usize>>,
+                max_operations: usize,
+                rng: Arc<Mutex<StdRng>>,
+                cx: &mut TestAppContext,
+            ) -> anyhow::Result<()> {
+                let fs = project.read_with(cx, |project, _| project.fs().clone());
+                while operations.get() < max_operations {
+                    operations.set(operations.get() + 1);
 
-                let distribution = rng.lock().gen_range::<usize, _>(0..100);
-                match distribution {
-                    0..=20 if !files.lock().is_empty() => {
-                        let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
-                        let mut path = path.as_path();
-                        while let Some(parent_path) = path.parent() {
-                            path = parent_path;
-                            if rng.lock().gen() {
-                                break;
+                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
+                    match distribution {
+                        0..=20 if !files.lock().is_empty() => {
+                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
+                            let mut path = path.as_path();
+                            while let Some(parent_path) = path.parent() {
+                                path = parent_path;
+                                if rng.lock().gen() {
+                                    break;
+                                }
                             }
-                        }
 
-                        log::info!("Host: find/create local worktree {:?}", path);
-                        let find_or_create_worktree = project.update(&mut cx, |project, cx| {
-                            project.find_or_create_local_worktree(path, true, cx)
-                        });
-                        let find_or_create_worktree = async move {
-                            find_or_create_worktree.await.unwrap();
-                        };
-                        if rng.lock().gen() {
-                            cx.background().spawn(find_or_create_worktree).detach();
-                        } else {
-                            find_or_create_worktree.await;
-                        }
-                    }
-                    10..=80 if !files.lock().is_empty() => {
-                        let buffer = if self.buffers.is_empty() || rng.lock().gen() {
-                            let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
-                            let (worktree, path) = project
-                                .update(&mut cx, |project, cx| {
-                                    project.find_or_create_local_worktree(file.clone(), true, cx)
-                                })
-                                .await
-                                .unwrap();
-                            let project_path =
-                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
-                            log::info!(
-                                "Host: opening path {:?}, worktree {}, relative_path {:?}",
-                                file,
-                                project_path.0,
-                                project_path.1
-                            );
-                            let buffer = project
-                                .update(&mut cx, |project, cx| {
-                                    project.open_buffer(project_path, cx)
-                                })
-                                .await
-                                .unwrap();
-                            self.buffers.insert(buffer.clone());
-                            buffer
-                        } else {
-                            self.buffers
-                                .iter()
-                                .choose(&mut *rng.lock())
-                                .unwrap()
-                                .clone()
-                        };
-
-                        if rng.lock().gen_bool(0.1) {
-                            cx.update(|cx| {
-                                log::info!(
-                                    "Host: dropping buffer {:?}",
-                                    buffer.read(cx).file().unwrap().full_path(cx)
-                                );
-                                self.buffers.remove(&buffer);
-                                drop(buffer);
+                            log::info!("Host: find/create local worktree {:?}", path);
+                            let find_or_create_worktree = project.update(cx, |project, cx| {
+                                project.find_or_create_local_worktree(path, true, cx)
                             });
-                        } else {
-                            buffer.update(&mut cx, |buffer, cx| {
+                            if rng.lock().gen() {
+                                cx.background().spawn(find_or_create_worktree).detach();
+                            } else {
+                                find_or_create_worktree.await?;
+                            }
+                        }
+                        10..=80 if !files.lock().is_empty() => {
+                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
+                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
+                                let (worktree, path) = project
+                                    .update(cx, |project, cx| {
+                                        project.find_or_create_local_worktree(
+                                            file.clone(),
+                                            true,
+                                            cx,
+                                        )
+                                    })
+                                    .await?;
+                                let project_path =
+                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
                                 log::info!(
-                                    "Host: updating buffer {:?} ({})",
-                                    buffer.file().unwrap().full_path(cx),
-                                    buffer.remote_id()
+                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
+                                    file,
+                                    project_path.0,
+                                    project_path.1
                                 );
-                                buffer.randomly_edit(&mut *rng.lock(), 5, cx)
-                            });
-                        }
-                    }
-                    _ => loop {
-                        let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
-                        let mut path = PathBuf::new();
-                        path.push("/");
-                        for _ in 0..path_component_count {
-                            let letter = rng.lock().gen_range(b'a'..=b'z');
-                            path.push(std::str::from_utf8(&[letter]).unwrap());
+                                let buffer = project
+                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
+                                    .await
+                                    .unwrap();
+                                client.buffers.insert(buffer.clone());
+                                buffer
+                            } else {
+                                client
+                                    .buffers
+                                    .iter()
+                                    .choose(&mut *rng.lock())
+                                    .unwrap()
+                                    .clone()
+                            };
+
+                            if rng.lock().gen_bool(0.1) {
+                                cx.update(|cx| {
+                                    log::info!(
+                                        "Host: dropping buffer {:?}",
+                                        buffer.read(cx).file().unwrap().full_path(cx)
+                                    );
+                                    client.buffers.remove(&buffer);
+                                    drop(buffer);
+                                });
+                            } else {
+                                buffer.update(cx, |buffer, cx| {
+                                    log::info!(
+                                        "Host: updating buffer {:?} ({})",
+                                        buffer.file().unwrap().full_path(cx),
+                                        buffer.remote_id()
+                                    );
+                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
+                                });
+                            }
                         }
-                        path.set_extension("rs");
-                        let parent_path = path.parent().unwrap();
+                        _ => loop {
+                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
+                            let mut path = PathBuf::new();
+                            path.push("/");
+                            for _ in 0..path_component_count {
+                                let letter = rng.lock().gen_range(b'a'..=b'z');
+                                path.push(std::str::from_utf8(&[letter]).unwrap());
+                            }
+                            path.set_extension("rs");
+                            let parent_path = path.parent().unwrap();
 
-                        log::info!("Host: creating file {:?}", path,);
+                            log::info!("Host: creating file {:?}", path,);
 
-                        if fs.create_dir(&parent_path).await.is_ok()
-                            && fs.create_file(&path, Default::default()).await.is_ok()
-                        {
-                            files.lock().push(path);
-                            break;
-                        } else {
-                            log::info!("Host: cannot create file");
-                        }
-                    },
+                            if fs.create_dir(&parent_path).await.is_ok()
+                                && fs.create_file(&path, Default::default()).await.is_ok()
+                            {
+                                files.lock().push(path);
+                                break;
+                            } else {
+                                log::info!("Host: cannot create file");
+                            }
+                        },
+                    }
+
+                    cx.background().simulate_random_delay().await;
                 }
 
-                cx.background().simulate_random_delay().await;
+                Ok(())
             }
 
+            simulate_host_internal(
+                &mut self,
+                project.clone(),
+                files,
+                operations,
+                max_operations,
+                rng,
+                &mut cx,
+            )
+            .log_err()
+            .await;
             log::info!("Host done");
-
             self.project = Some(project);
             (self, cx)
         }
@@ -5726,244 +5795,292 @@ mod tests {
             operations: Rc<Cell<usize>>,
             max_operations: usize,
             rng: Arc<Mutex<StdRng>>,
+            host_disconnected: Rc<AtomicBool>,
             mut cx: TestAppContext,
         ) -> (Self, TestAppContext) {
-            while operations.get() < max_operations {
-                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
-                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
-                        project
-                            .worktrees(&cx)
-                            .filter(|worktree| {
-                                let worktree = worktree.read(cx);
-                                worktree.is_visible()
-                                    && worktree.entries(false).any(|e| e.is_file())
+            async fn simulate_guest_internal(
+                client: &mut TestClient,
+                guest_id: usize,
+                project: ModelHandle<Project>,
+                operations: Rc<Cell<usize>>,
+                max_operations: usize,
+                rng: Arc<Mutex<StdRng>>,
+                cx: &mut TestAppContext,
+            ) -> anyhow::Result<()> {
+                while operations.get() < max_operations {
+                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
+                        let worktree = if let Some(worktree) =
+                            project.read_with(cx, |project, cx| {
+                                project
+                                    .worktrees(&cx)
+                                    .filter(|worktree| {
+                                        let worktree = worktree.read(cx);
+                                        worktree.is_visible()
+                                            && worktree.entries(false).any(|e| e.is_file())
+                                    })
+                                    .choose(&mut *rng.lock())
+                            }) {
+                            worktree
+                        } else {
+                            cx.background().simulate_random_delay().await;
+                            continue;
+                        };
+
+                        operations.set(operations.get() + 1);
+                        let (worktree_root_name, project_path) =
+                            worktree.read_with(cx, |worktree, _| {
+                                let entry = worktree
+                                    .entries(false)
+                                    .filter(|e| e.is_file())
+                                    .choose(&mut *rng.lock())
+                                    .unwrap();
+                                (
+                                    worktree.root_name().to_string(),
+                                    (worktree.id(), entry.path.clone()),
+                                )
+                            });
+                        log::info!(
+                            "Guest {}: opening path {:?} in worktree {} ({})",
+                            guest_id,
+                            project_path.1,
+                            project_path.0,
+                            worktree_root_name,
+                        );
+                        let buffer = project
+                            .update(cx, |project, cx| {
+                                project.open_buffer(project_path.clone(), cx)
                             })
-                            .choose(&mut *rng.lock())
-                    }) {
-                        worktree
+                            .await?;
+                        log::info!(
+                            "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
+                            guest_id,
+                            project_path.1,
+                            project_path.0,
+                            worktree_root_name,
+                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
+                        );
+                        client.buffers.insert(buffer.clone());
+                        buffer
                     } else {
-                        cx.background().simulate_random_delay().await;
-                        continue;
-                    };
-
-                    operations.set(operations.get() + 1);
-                    let (worktree_root_name, project_path) =
-                        worktree.read_with(&cx, |worktree, _| {
-                            let entry = worktree
-                                .entries(false)
-                                .filter(|e| e.is_file())
-                                .choose(&mut *rng.lock())
-                                .unwrap();
-                            (
-                                worktree.root_name().to_string(),
-                                (worktree.id(), entry.path.clone()),
-                            )
-                        });
-                    log::info!(
-                        "Guest {}: opening path {:?} in worktree {} ({})",
-                        guest_id,
-                        project_path.1,
-                        project_path.0,
-                        worktree_root_name,
-                    );
-                    let buffer = project
-                        .update(&mut cx, |project, cx| {
-                            project.open_buffer(project_path.clone(), cx)
-                        })
-                        .await
-                        .unwrap();
-                    log::info!(
-                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
-                        guest_id,
-                        project_path.1,
-                        project_path.0,
-                        worktree_root_name,
-                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
-                    );
-                    self.buffers.insert(buffer.clone());
-                    buffer
-                } else {
-                    operations.set(operations.get() + 1);
+                        operations.set(operations.get() + 1);
 
-                    self.buffers
-                        .iter()
-                        .choose(&mut *rng.lock())
-                        .unwrap()
-                        .clone()
-                };
+                        client
+                            .buffers
+                            .iter()
+                            .choose(&mut *rng.lock())
+                            .unwrap()
+                            .clone()
+                    };
 
-                let choice = rng.lock().gen_range(0..100);
-                match choice {
-                    0..=9 => {
-                        cx.update(|cx| {
-                            log::info!(
-                                "Guest {}: dropping buffer {:?}",
-                                guest_id,
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            self.buffers.remove(&buffer);
-                            drop(buffer);
-                        });
-                    }
-                    10..=19 => {
-                        let completions = project.update(&mut cx, |project, cx| {
-                            log::info!(
-                                "Guest {}: requesting completions for buffer {} ({:?})",
-                                guest_id,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.completions(&buffer, offset, cx)
-                        });
-                        let completions = cx.background().spawn(async move {
-                            completions.await.expect("completions request failed");
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching completions request", guest_id);
-                            completions.detach();
-                        } else {
-                            completions.await;
+                    let choice = rng.lock().gen_range(0..100);
+                    match choice {
+                        0..=9 => {
+                            cx.update(|cx| {
+                                log::info!(
+                                    "Guest {}: dropping buffer {:?}",
+                                    guest_id,
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                client.buffers.remove(&buffer);
+                                drop(buffer);
+                            });
                         }
-                    }
-                    20..=29 => {
-                        let code_actions = project.update(&mut cx, |project, cx| {
-                            log::info!(
-                                "Guest {}: requesting code actions for buffer {} ({:?})",
-                                guest_id,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
-                            project.code_actions(&buffer, range, cx)
-                        });
-                        let code_actions = cx.background().spawn(async move {
-                            code_actions.await.expect("code actions request failed");
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching code actions request", guest_id);
-                            code_actions.detach();
-                        } else {
-                            code_actions.await;
+                        10..=19 => {
+                            let completions = project.update(cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: requesting completions for buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.read(cx).remote_id(),
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                                project.completions(&buffer, offset, cx)
+                            });
+                            let completions = cx.background().spawn(async move {
+                                completions
+                                    .await
+                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching completions request", guest_id);
+                                cx.update(|cx| completions.detach_and_log_err(cx));
+                            } else {
+                                completions.await?;
+                            }
                         }
-                    }
-                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
-                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
-                            log::info!(
-                                "Guest {}: saving buffer {} ({:?})",
-                                guest_id,
-                                buffer.remote_id(),
-                                buffer.file().unwrap().full_path(cx)
-                            );
-                            (buffer.version(), buffer.save(cx))
-                        });
-                        let save = cx.background().spawn(async move {
-                            let (saved_version, _) = save.await.expect("save request failed");
-                            assert!(saved_version.observed_all(&requested_version));
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching save request", guest_id);
-                            save.detach();
-                        } else {
-                            save.await;
+                        20..=29 => {
+                            let code_actions = project.update(cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: requesting code actions for buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.read(cx).remote_id(),
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
+                                project.code_actions(&buffer, range, cx)
+                            });
+                            let code_actions = cx.background().spawn(async move {
+                                code_actions.await.map_err(|err| {
+                                    anyhow!("code actions request failed: {:?}", err)
+                                })
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching code actions request", guest_id);
+                                cx.update(|cx| code_actions.detach_and_log_err(cx));
+                            } else {
+                                code_actions.await?;
+                            }
                         }
-                    }
-                    40..=44 => {
-                        let prepare_rename = project.update(&mut cx, |project, cx| {
-                            log::info!(
-                                "Guest {}: preparing rename for buffer {} ({:?})",
-                                guest_id,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.prepare_rename(buffer, offset, cx)
-                        });
-                        let prepare_rename = cx.background().spawn(async move {
-                            prepare_rename.await.expect("prepare rename request failed");
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching prepare rename request", guest_id);
-                            prepare_rename.detach();
-                        } else {
-                            prepare_rename.await;
+                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
+                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
+                                log::info!(
+                                    "Guest {}: saving buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.remote_id(),
+                                    buffer.file().unwrap().full_path(cx)
+                                );
+                                (buffer.version(), buffer.save(cx))
+                            });
+                            let save = cx.background().spawn(async move {
+                                let (saved_version, _) = save
+                                    .await
+                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
+                                assert!(saved_version.observed_all(&requested_version));
+                                Ok::<_, anyhow::Error>(())
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching save request", guest_id);
+                                cx.update(|cx| save.detach_and_log_err(cx));
+                            } else {
+                                save.await?;
+                            }
                         }
-                    }
-                    45..=49 => {
-                        let definitions = project.update(&mut cx, |project, cx| {
-                            log::info!(
-                                "Guest {}: requesting definitions for buffer {} ({:?})",
-                                guest_id,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.definition(&buffer, offset, cx)
-                        });
-                        let definitions = cx.background().spawn(async move {
-                            definitions.await.expect("definitions request failed")
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching definitions request", guest_id);
-                            definitions.detach();
-                        } else {
-                            self.buffers
-                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
+                        40..=44 => {
+                            let prepare_rename = project.update(cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: preparing rename for buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.read(cx).remote_id(),
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                                project.prepare_rename(buffer, offset, cx)
+                            });
+                            let prepare_rename = cx.background().spawn(async move {
+                                prepare_rename.await.map_err(|err| {
+                                    anyhow!("prepare rename request failed: {:?}", err)
+                                })
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching prepare rename request", guest_id);
+                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
+                            } else {
+                                prepare_rename.await?;
+                            }
                         }
-                    }
-                    50..=54 => {
-                        let highlights = project.update(&mut cx, |project, cx| {
-                            log::info!(
-                                "Guest {}: requesting highlights for buffer {} ({:?})",
-                                guest_id,
-                                buffer.read(cx).remote_id(),
-                                buffer.read(cx).file().unwrap().full_path(cx)
-                            );
-                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
-                            project.document_highlights(&buffer, offset, cx)
-                        });
-                        let highlights = cx.background().spawn(async move {
-                            highlights.await.expect("highlights request failed");
-                        });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching highlights request", guest_id);
-                            highlights.detach();
-                        } else {
-                            highlights.await;
+                        45..=49 => {
+                            let definitions = project.update(cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: requesting definitions for buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.read(cx).remote_id(),
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                                project.definition(&buffer, offset, cx)
+                            });
+                            let definitions = cx.background().spawn(async move {
+                                definitions
+                                    .await
+                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching definitions request", guest_id);
+                                cx.update(|cx| definitions.detach_and_log_err(cx));
+                            } else {
+                                client
+                                    .buffers
+                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
+                            }
                         }
-                    }
-                    55..=59 => {
-                        let search = project.update(&mut cx, |project, cx| {
-                            let query = rng.lock().gen_range('a'..='z');
-                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
-                            project.search(SearchQuery::text(query, false, false), cx)
-                        });
-                        let search = cx
-                            .background()
-                            .spawn(async move { search.await.expect("search request failed") });
-                        if rng.lock().gen_bool(0.3) {
-                            log::info!("Guest {}: detaching search request", guest_id);
-                            search.detach();
-                        } else {
-                            self.buffers.extend(search.await.into_keys());
+                        50..=54 => {
+                            let highlights = project.update(cx, |project, cx| {
+                                log::info!(
+                                    "Guest {}: requesting highlights for buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.read(cx).remote_id(),
+                                    buffer.read(cx).file().unwrap().full_path(cx)
+                                );
+                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                                project.document_highlights(&buffer, offset, cx)
+                            });
+                            let highlights = cx.background().spawn(async move {
+                                highlights
+                                    .await
+                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching highlights request", guest_id);
+                                cx.update(|cx| highlights.detach_and_log_err(cx));
+                            } else {
+                                highlights.await?;
+                            }
+                        }
+                        55..=59 => {
+                            let search = project.update(cx, |project, cx| {
+                                let query = rng.lock().gen_range('a'..='z');
+                                log::info!("Guest {}: project-wide search {:?}", guest_id, query);
+                                project.search(SearchQuery::text(query, false, false), cx)
+                            });
+                            let search = cx.background().spawn(async move {
+                                search
+                                    .await
+                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
+                            });
+                            if rng.lock().gen_bool(0.3) {
+                                log::info!("Guest {}: detaching search request", guest_id);
+                                cx.update(|cx| search.detach_and_log_err(cx));
+                            } else {
+                                client.buffers.extend(search.await?.into_keys());
+                            }
+                        }
+                        _ => {
+                            buffer.update(cx, |buffer, cx| {
+                                log::info!(
+                                    "Guest {}: updating buffer {} ({:?})",
+                                    guest_id,
+                                    buffer.remote_id(),
+                                    buffer.file().unwrap().full_path(cx)
+                                );
+                                buffer.randomly_edit(&mut *rng.lock(), 5, cx)
+                            });
                         }
                     }
-                    _ => {
-                        buffer.update(&mut cx, |buffer, cx| {
-                            log::info!(
-                                "Guest {}: updating buffer {} ({:?})",
-                                guest_id,
-                                buffer.remote_id(),
-                                buffer.file().unwrap().full_path(cx)
-                            );
-                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
-                        });
-                    }
+                    cx.background().simulate_random_delay().await;
                 }
-                cx.background().simulate_random_delay().await;
+                Ok(())
             }
 
-            log::info!("Guest {} done", guest_id);
+            match simulate_guest_internal(
+                &mut self,
+                guest_id,
+                project.clone(),
+                operations,
+                max_operations,
+                rng,
+                &mut cx,
+            )
+            .await
+            {
+                Ok(()) => log::info!("guest {} done", guest_id),
+                Err(err) => {
+                    if host_disconnected.load(SeqCst) {
+                        log::error!("guest {} simulation error - {:?}", guest_id, err);
+                    } else {
+                        panic!("guest {} simulation error - {:?}", guest_id, err);
+                    }
+                }
+            }
 
             self.project = Some(project);
             (self, cx)

crates/server/src/rpc/store.rs 🔗

@@ -244,6 +244,9 @@ impl Store {
                 language_servers: Default::default(),
             },
         );
+        if let Some(connection) = self.connections.get_mut(&host_connection_id) {
+            connection.projects.insert(project_id);
+        }
         self.next_project_id += 1;
         project_id
     }
@@ -266,9 +269,7 @@ impl Store {
                     .or_default()
                     .insert(project_id);
             }
-            if let Some(connection) = self.connections.get_mut(&project.host_connection_id) {
-                connection.projects.insert(project_id);
-            }
+
             project.worktrees.insert(worktree_id, worktree);
             if let Ok(share) = project.share_mut() {
                 share.worktrees.insert(worktree_id, Default::default());