From bb8dc6120bd2f83242b300995d403f343f8844c6 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 10:58:57 +0200 Subject: [PATCH 01/11] Allow taking an `Arc` in `[gpui::test]`-decorated tests --- crates/gpui_macros/src/gpui_macros.rs | 99 +++++++++++++-------------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/crates/gpui_macros/src/gpui_macros.rs b/crates/gpui_macros/src/gpui_macros.rs index c107175dca71fcec76e32a08dd8ff4e6579dadaf..7ec8b4fb7c6c17508fe028b213e1bd9f471ce19f 100644 --- a/crates/gpui_macros/src/gpui_macros.rs +++ b/crates/gpui_macros/src/gpui_macros.rs @@ -75,68 +75,65 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { match last_segment.map(|s| s.ident.to_string()).as_deref() { Some("StdRng") => { inner_fn_args.extend(quote!(rand::SeedableRng::seed_from_u64(seed),)); + continue; } Some("bool") => { inner_fn_args.extend(quote!(is_last_iteration,)); + continue; } - _ => { - return TokenStream::from( - syn::Error::new_spanned(arg, "invalid argument") - .into_compile_error(), - ) - } - } - } else if let Type::Reference(ty) = &*arg.ty { - match &*ty.elem { - Type::Path(ty) => { - let last_segment = ty.path.segments.last(); - match last_segment.map(|s| s.ident.to_string()).as_deref() { - Some("TestAppContext") => { - let first_entity_id = ix * 100_000; - let cx_varname = format_ident!("cx_{}", ix); - cx_vars.extend(quote!( - let mut #cx_varname = #namespace::TestAppContext::new( - foreground_platform.clone(), - cx.platform().clone(), - deterministic.build_foreground(#ix), - deterministic.build_background(), - cx.font_cache().clone(), - cx.leak_detector(), - #first_entity_id, - ); - )); - cx_teardowns.extend(quote!( - #cx_varname.update(|cx| cx.remove_all_windows()); - deterministic.run_until_parked(); - #cx_varname.update(|_| {}); // flush effects - )); - inner_fn_args.extend(quote!(&mut #cx_varname,)); - } - _ => { - return TokenStream::from( - syn::Error::new_spanned(arg, "invalid argument") - .into_compile_error(), - ) + Some("Arc") => { + if let syn::PathArguments::AngleBracketed(args) = + &last_segment.unwrap().arguments + { + if let Some(syn::GenericArgument::Type(syn::Type::Path(ty))) = + args.args.last() + { + let last_segment = ty.path.segments.last(); + if let Some("Deterministic") = + last_segment.map(|s| s.ident.to_string()).as_deref() + { + inner_fn_args.extend(quote!(deterministic.clone(),)); + continue; + } } } } - _ => { - return TokenStream::from( - syn::Error::new_spanned(arg, "invalid argument") - .into_compile_error(), - ) + _ => {} + } + } else if let Type::Reference(ty) = &*arg.ty { + if let Type::Path(ty) = &*ty.elem { + let last_segment = ty.path.segments.last(); + if let Some("TestAppContext") = + last_segment.map(|s| s.ident.to_string()).as_deref() + { + let first_entity_id = ix * 100_000; + let cx_varname = format_ident!("cx_{}", ix); + cx_vars.extend(quote!( + let mut #cx_varname = #namespace::TestAppContext::new( + foreground_platform.clone(), + cx.platform().clone(), + deterministic.build_foreground(#ix), + deterministic.build_background(), + cx.font_cache().clone(), + cx.leak_detector(), + #first_entity_id, + ); + )); + cx_teardowns.extend(quote!( + #cx_varname.update(|cx| cx.remove_all_windows()); + deterministic.run_until_parked(); + #cx_varname.update(|_| {}); // flush effects + )); + inner_fn_args.extend(quote!(&mut #cx_varname,)); + continue; } } - } else { - return TokenStream::from( - syn::Error::new_spanned(arg, "invalid argument").into_compile_error(), - ); } - } else { - return TokenStream::from( - syn::Error::new_spanned(arg, "invalid argument").into_compile_error(), - ); } + + return TokenStream::from( + syn::Error::new_spanned(arg, "invalid argument").into_compile_error(), + ); } parse_quote! { From 02f96c6defa5dc40039731d8c950150cec2b666d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 11:00:31 +0200 Subject: [PATCH 02/11] Simulate parallelism among peers correctly in randomized collab test Previously they were all using the same foreground executor, which was not properly simulating concurrency among tasks from different peers. --- crates/server/src/rpc.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index ce35401b980ca529975705b777b190bedefb4789..d249c43398f60adbca01b88360d04226674c348b 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1087,7 +1087,11 @@ mod tests { self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename, ToOffset, ToggleCodeActions, Undo, }; - use gpui::{executor, geometry::vector::vec2f, ModelHandle, TestAppContext, ViewHandle}; + use gpui::{ + executor::{self, Deterministic}, + geometry::vector::vec2f, + ModelHandle, TestAppContext, ViewHandle, + }; use language::{ range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope, @@ -4969,7 +4973,11 @@ mod tests { } #[gpui::test(iterations = 100)] - async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) { + async fn test_random_collaboration( + cx: &mut TestAppContext, + deterministic: Arc, + rng: StdRng, + ) { cx.foreground().forbid_parking(); let max_peers = env::var("MAX_PEERS") .map(|i| i.parse().expect("invalid `MAX_PEERS` variable")) @@ -5002,8 +5010,8 @@ mod tests { let mut host_cx = TestAppContext::new( cx.foreground_platform(), cx.platform(), - cx.foreground(), - cx.background(), + deterministic.build_foreground(next_entity_id), + deterministic.build_background(), cx.font_cache(), cx.leak_detector(), next_entity_id, @@ -5165,7 +5173,7 @@ mod tests { let host_disconnected = Rc::new(AtomicBool::new(false)); user_ids.push(host.current_user_id(&host_cx)); - clients.push(cx.foreground().spawn(host.simulate_host( + clients.push(host_cx.foreground().spawn(host.simulate_host( host_project, files, operations.clone(), @@ -5187,8 +5195,8 @@ mod tests { let mut guest_cx = TestAppContext::new( cx.foreground_platform(), cx.platform(), - cx.foreground(), - cx.background(), + deterministic.build_foreground(next_entity_id), + deterministic.build_background(), cx.font_cache(), cx.leak_detector(), next_entity_id, @@ -5207,7 +5215,7 @@ mod tests { .await .unwrap(); user_ids.push(guest.current_user_id(&guest_cx)); - clients.push(cx.foreground().spawn(guest.simulate_guest( + clients.push(guest_cx.foreground().spawn(guest.simulate_guest( guest_id, guest_project, operations.clone(), From f99a1437cd614df74a093580760c4f5301eb6e78 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 14:36:36 +0200 Subject: [PATCH 03/11] Distribute operation workload evenly across peers in randomized test Co-Authored-By: Nathan Sobo --- crates/server/src/rpc.rs | 148 +++++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 67 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index d249c43398f60adbca01b88360d04226674c348b..e825f98bdd8250dd98f4948781ecabe121a504d3 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1110,7 +1110,6 @@ mod tests { use settings::Settings; use sqlx::types::time::OffsetDateTime; use std::{ - cell::Cell, env, ops::Deref, path::{Path, PathBuf}, @@ -5000,10 +4999,10 @@ mod tests { ) .await; - let operations = Rc::new(Cell::new(0)); let mut server = TestServer::start(cx.foreground(), cx.background()).await; let mut clients = Vec::new(); let mut user_ids = Vec::new(); + let mut op_start_signals = Vec::new(); let files = Arc::new(Mutex::new(Vec::new())); let mut next_entity_id = 100000; @@ -5172,64 +5171,29 @@ mod tests { host_language_registry.add(Arc::new(language)); let host_disconnected = Rc::new(AtomicBool::new(false)); + let op_start_signal = futures::channel::mpsc::unbounded(); user_ids.push(host.current_user_id(&host_cx)); + op_start_signals.push(op_start_signal.0); clients.push(host_cx.foreground().spawn(host.simulate_host( host_project, files, - operations.clone(), - max_operations, + op_start_signal.1, rng.clone(), host_cx, ))); - while operations.get() < max_operations { - cx.background().simulate_random_delay().await; - if clients.len() >= max_peers { - break; - } else if rng.lock().gen_bool(0.05) { - operations.set(operations.get() + 1); - - let guest_id = clients.len(); - log::info!("Adding guest {}", guest_id); - next_entity_id += 100000; - let mut guest_cx = TestAppContext::new( - cx.foreground_platform(), - cx.platform(), - deterministic.build_foreground(next_entity_id), - deterministic.build_background(), - cx.font_cache(), - cx.leak_detector(), - next_entity_id, - ); - let guest = server - .create_client(&mut guest_cx, &format!("guest-{}", guest_id)) - .await; - let guest_project = Project::remote( - host_project_id, - guest.client.clone(), - guest.user_store.clone(), - guest_lang_registry.clone(), - FakeFs::new(cx.background()), - &mut guest_cx.to_async(), - ) - .await - .unwrap(); - user_ids.push(guest.current_user_id(&guest_cx)); - clients.push(guest_cx.foreground().spawn(guest.simulate_guest( - guest_id, - guest_project, - operations.clone(), - max_operations, - rng.clone(), - host_disconnected.clone(), - guest_cx, - ))); - - log::info!("Guest {} added", guest_id); - } else if rng.lock().gen_bool(0.05) { + let disconnect_host_at = if rng.lock().gen_bool(0.2) { + rng.lock().gen_range(0..max_operations) + } else { + max_operations + }; + let mut operations = 0; + while operations < max_operations { + if operations == disconnect_host_at { host_disconnected.store(true, SeqCst); server.disconnect_client(user_ids[0]); cx.foreground().advance_clock(RECEIVE_TIMEOUT); + drop(op_start_signals); let mut clients = futures::future::join_all(clients).await; cx.foreground().run_until_parked(); @@ -5258,8 +5222,68 @@ mod tests { return; } + + let distribution = rng.lock().gen_range(0..100); + match distribution { + 0..=19 if clients.len() < max_peers => { + let guest_id = clients.len(); + log::info!("Adding guest {}", guest_id); + next_entity_id += 100000; + let mut guest_cx = TestAppContext::new( + cx.foreground_platform(), + cx.platform(), + deterministic.build_foreground(next_entity_id), + deterministic.build_background(), + cx.font_cache(), + cx.leak_detector(), + next_entity_id, + ); + let guest = server + .create_client(&mut guest_cx, &format!("guest-{}", guest_id)) + .await; + let guest_project = Project::remote( + host_project_id, + guest.client.clone(), + guest.user_store.clone(), + guest_lang_registry.clone(), + FakeFs::new(cx.background()), + &mut guest_cx.to_async(), + ) + .await + .unwrap(); + let op_start_signal = futures::channel::mpsc::unbounded(); + user_ids.push(guest.current_user_id(&guest_cx)); + op_start_signals.push(op_start_signal.0); + clients.push(guest_cx.foreground().spawn(guest.simulate_guest( + guest_id, + guest_project, + op_start_signal.1, + rng.clone(), + host_disconnected.clone(), + guest_cx, + ))); + + log::info!("Guest {} added", guest_id); + operations += 1; + } + _ => { + while operations < max_operations && rng.lock().gen_bool(0.7) { + op_start_signals + .choose(&mut *rng.lock()) + .unwrap() + .unbounded_send(()) + .unwrap(); + operations += 1; + } + + if rng.lock().gen_bool(0.8) { + cx.foreground().run_until_parked(); + } + } + } } + drop(op_start_signals); let mut clients = futures::future::join_all(clients).await; cx.foreground().run_until_parked(); @@ -5655,8 +5679,7 @@ mod tests { mut self, project: ModelHandle, files: Arc>>, - operations: Rc>, - max_operations: usize, + op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, mut cx: TestAppContext, ) -> (Self, TestAppContext) { @@ -5664,15 +5687,13 @@ mod tests { client: &mut TestClient, project: ModelHandle, files: Arc>>, - operations: Rc>, - max_operations: usize, + mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, cx: &mut TestAppContext, ) -> anyhow::Result<()> { let fs = project.read_with(cx, |project, _| project.fs().clone()); - while operations.get() < max_operations { - operations.set(operations.get() + 1); + while op_start_signal.next().await.is_some() { let distribution = rng.lock().gen_range::(0..100); match distribution { 0..=20 if !files.lock().is_empty() => { @@ -5784,8 +5805,7 @@ mod tests { &mut self, project.clone(), files, - operations, - max_operations, + op_start_signal, rng, &mut cx, ) @@ -5800,8 +5820,7 @@ mod tests { mut self, guest_id: usize, project: ModelHandle, - operations: Rc>, - max_operations: usize, + op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, host_disconnected: Rc, mut cx: TestAppContext, @@ -5810,12 +5829,11 @@ mod tests { client: &mut TestClient, guest_id: usize, project: ModelHandle, - operations: Rc>, - max_operations: usize, + mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, cx: &mut TestAppContext, ) -> anyhow::Result<()> { - while operations.get() < max_operations { + while op_start_signal.next().await.is_some() { let buffer = if client.buffers.is_empty() || rng.lock().gen() { let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| { @@ -5834,7 +5852,6 @@ mod tests { continue; }; - operations.set(operations.get() + 1); let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| { let entry = worktree @@ -5870,8 +5887,6 @@ mod tests { client.buffers.insert(buffer.clone()); buffer } else { - operations.set(operations.get() + 1); - client .buffers .iter() @@ -6073,8 +6088,7 @@ mod tests { &mut self, guest_id, project.clone(), - operations, - max_operations, + op_start_signal, rng, &mut cx, ) From c3927c541f612c45eb90decabd98b273925b38a0 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 16:14:29 +0200 Subject: [PATCH 04/11] Simulate random guest disconnection and reconnection --- crates/server/src/rpc.rs | 216 ++++++++++++++++++++++++--------------- 1 file changed, 135 insertions(+), 81 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index e825f98bdd8250dd98f4948781ecabe121a504d3..c63f57f42c1735d2273d3ab0fc494c1e3e31d5e6 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1120,7 +1120,6 @@ mod tests { }, time::Duration, }; - use util::TryFutureExt; use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams}; #[cfg(test)] @@ -4981,6 +4980,8 @@ mod tests { let max_peers = env::var("MAX_PEERS") .map(|i| i.parse().expect("invalid `MAX_PEERS` variable")) .unwrap_or(5); + assert!(max_peers <= 5); + let max_operations = env::var("OPERATIONS") .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) .unwrap_or(10); @@ -4994,7 +4995,7 @@ mod tests { fs.insert_tree( "/_collab", json!({ - ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"# + ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"# }), ) .await; @@ -5170,7 +5171,6 @@ mod tests { }); host_language_registry.add(Arc::new(language)); - let host_disconnected = Rc::new(AtomicBool::new(false)); let op_start_signal = futures::channel::mpsc::unbounded(); user_ids.push(host.current_user_id(&host_cx)); op_start_signals.push(op_start_signal.0); @@ -5187,22 +5187,33 @@ mod tests { } else { max_operations }; + let mut available_guests = vec![ + "guest-1".to_string(), + "guest-2".to_string(), + "guest-3".to_string(), + "guest-4".to_string(), + ]; let mut operations = 0; while operations < max_operations { if operations == disconnect_host_at { - host_disconnected.store(true, SeqCst); server.disconnect_client(user_ids[0]); cx.foreground().advance_clock(RECEIVE_TIMEOUT); drop(op_start_signals); let mut clients = futures::future::join_all(clients).await; cx.foreground().run_until_parked(); - let (host, mut host_cx) = clients.remove(0); + let (host, mut host_cx, host_err) = clients.remove(0); + if let Some(host_err) = host_err { + log::error!("host error - {}", host_err); + } host.project .as_ref() .unwrap() .read_with(&host_cx, |project, _| assert!(!project.is_shared())); - for (guest, mut guest_cx) in clients { + for (guest, mut guest_cx, guest_err) in clients { + if let Some(guest_err) = guest_err { + log::error!("{} error - {}", guest.username, guest_err); + } let contacts = server .store .read() @@ -5225,9 +5236,10 @@ mod tests { let distribution = rng.lock().gen_range(0..100); match distribution { - 0..=19 if clients.len() < max_peers => { - let guest_id = clients.len(); - log::info!("Adding guest {}", guest_id); + 0..=19 if !available_guests.is_empty() => { + let guest_ix = rng.lock().gen_range(0..available_guests.len()); + let guest_username = available_guests.remove(guest_ix); + log::info!("Adding new connection for {}", guest_username); next_entity_id += 100000; let mut guest_cx = TestAppContext::new( cx.foreground_platform(), @@ -5238,9 +5250,7 @@ mod tests { cx.leak_detector(), next_entity_id, ); - let guest = server - .create_client(&mut guest_cx, &format!("guest-{}", guest_id)) - .await; + let guest = server.create_client(&mut guest_cx, &guest_username).await; let guest_project = Project::remote( host_project_id, guest.client.clone(), @@ -5255,15 +5265,54 @@ mod tests { user_ids.push(guest.current_user_id(&guest_cx)); op_start_signals.push(op_start_signal.0); clients.push(guest_cx.foreground().spawn(guest.simulate_guest( - guest_id, + guest_username.clone(), guest_project, op_start_signal.1, rng.clone(), - host_disconnected.clone(), guest_cx, ))); - log::info!("Guest {} added", guest_id); + log::info!("Added connection for {}", guest_username); + operations += 1; + } + 20..=30 if clients.len() > 1 => { + log::info!("Removing guest"); + let guest_ix = rng.lock().gen_range(1..clients.len()); + let removed_guest_id = user_ids.remove(guest_ix); + let guest = clients.remove(guest_ix); + op_start_signals.remove(guest_ix); + server.disconnect_client(removed_guest_id); + cx.foreground().advance_clock(RECEIVE_TIMEOUT); + let (guest, mut guest_cx, guest_err) = guest.await; + if let Some(guest_err) = guest_err { + log::error!("{} error - {}", guest.username, guest_err); + } + guest + .project + .as_ref() + .unwrap() + .read_with(&guest_cx, |project, _| assert!(project.is_read_only())); + for user_id in &user_ids { + for contact in server.store.read().contacts_for_user(*user_id) { + assert_ne!( + contact.user_id, removed_guest_id.0 as u64, + "removed guest is still a contact of another peer" + ); + for project in contact.projects { + for project_guest_id in project.guests { + assert_ne!( + project_guest_id, removed_guest_id.0 as u64, + "removed guest appears as still participating on a project" + ); + } + } + } + } + + log::info!("{} removed", guest.username); + available_guests.push(guest.username.clone()); + guest_cx.update(|_| drop(guest)); + operations += 1; } _ => { @@ -5287,7 +5336,10 @@ mod tests { let mut clients = futures::future::join_all(clients).await; cx.foreground().run_until_parked(); - let (host_client, mut host_cx) = clients.remove(0); + let (host_client, mut host_cx, host_err) = clients.remove(0); + if let Some(host_err) = host_err { + panic!("host error - {}", host_err); + } let host_project = host_client.project.as_ref().unwrap(); let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| { project @@ -5305,8 +5357,10 @@ mod tests { .unwrap() .read_with(&host_cx, |project, cx| project.check_invariants(cx)); - for (guest_client, mut guest_cx) in clients.into_iter() { - let guest_id = guest_client.client.id(); + for (guest_client, mut guest_cx, guest_err) in clients.into_iter() { + if let Some(guest_err) = guest_err { + panic!("{} error - {}", guest_client.username, guest_err); + } let worktree_snapshots = guest_client .project @@ -5325,23 +5379,23 @@ mod tests { assert_eq!( worktree_snapshots.keys().collect::>(), host_worktree_snapshots.keys().collect::>(), - "guest {} has different worktrees than the host", - guest_id + "{} has different worktrees than the host", + guest_client.username ); for (id, host_snapshot) in &host_worktree_snapshots { let guest_snapshot = &worktree_snapshots[id]; assert_eq!( guest_snapshot.root_name(), host_snapshot.root_name(), - "guest {} has different root name than the host for worktree {}", - guest_id, + "{} has different root name than the host for worktree {}", + guest_client.username, id ); assert_eq!( guest_snapshot.entries(false).collect::>(), host_snapshot.entries(false).collect::>(), - "guest {} has different snapshot than the host for worktree {}", - guest_id, + "{} has different snapshot than the host for worktree {}", + guest_client.username, id ); } @@ -5357,7 +5411,7 @@ mod tests { let host_buffer = host_project.read_with(&host_cx, |project, cx| { project.buffer_for_id(buffer_id, cx).expect(&format!( "host does not have buffer for guest:{}, peer:{}, id:{}", - guest_id, guest_client.peer_id, buffer_id + guest_client.username, guest_client.peer_id, buffer_id )) }); let path = host_buffer @@ -5366,16 +5420,16 @@ mod tests { assert_eq!( guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()), 0, - "guest {}, buffer {}, path {:?} has deferred operations", - guest_id, + "{}, buffer {}, path {:?} has deferred operations", + guest_client.username, buffer_id, path, ); assert_eq!( guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()), host_buffer.read_with(&host_cx, |buffer, _| buffer.text()), - "guest {}, buffer {}, path {:?}, differs from the host's buffer", - guest_id, + "{}, buffer {}, path {:?}, differs from the host's buffer", + guest_client.username, buffer_id, path ); @@ -5495,6 +5549,7 @@ mod tests { let client = TestClient { client, peer_id, + username: name.to_string(), user_store, language_registry: Arc::new(LanguageRegistry::test()), project: Default::default(), @@ -5571,6 +5626,7 @@ mod tests { struct TestClient { client: Arc, + username: String, pub peer_id: PeerId, pub user_store: ModelHandle, language_registry: Arc, @@ -5682,7 +5738,7 @@ mod tests { op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, mut cx: TestAppContext, - ) -> (Self, TestAppContext) { + ) -> (Self, TestAppContext, Option) { async fn simulate_host_internal( client: &mut TestClient, project: ModelHandle, @@ -5767,7 +5823,12 @@ mod tests { buffer.file().unwrap().full_path(cx), buffer.remote_id() ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) + + if rng.lock().gen_bool(0.7) { + buffer.randomly_edit(&mut *rng.lock(), 5, cx); + } else { + buffer.randomly_undo_redo(&mut *rng.lock(), cx); + } }); } } @@ -5801,7 +5862,7 @@ mod tests { Ok(()) } - simulate_host_internal( + let result = simulate_host_internal( &mut self, project.clone(), files, @@ -5809,25 +5870,23 @@ mod tests { rng, &mut cx, ) - .log_err() .await; log::info!("Host done"); self.project = Some(project); - (self, cx) + (self, cx, result.err()) } pub async fn simulate_guest( mut self, - guest_id: usize, + guest_username: String, project: ModelHandle, op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, - host_disconnected: Rc, mut cx: TestAppContext, - ) -> (Self, TestAppContext) { + ) -> (Self, TestAppContext, Option) { async fn simulate_guest_internal( client: &mut TestClient, - guest_id: usize, + guest_username: &str, project: ModelHandle, mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, @@ -5865,8 +5924,8 @@ mod tests { ) }); log::info!( - "Guest {}: opening path {:?} in worktree {} ({})", - guest_id, + "{}: opening path {:?} in worktree {} ({})", + guest_username, project_path.1, project_path.0, worktree_root_name, @@ -5877,8 +5936,8 @@ mod tests { }) .await?; log::info!( - "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}", - guest_id, + "{}: opened path {:?} in worktree {} ({}) with buffer id {}", + guest_username, project_path.1, project_path.0, worktree_root_name, @@ -5900,8 +5959,8 @@ mod tests { 0..=9 => { cx.update(|cx| { log::info!( - "Guest {}: dropping buffer {:?}", - guest_id, + "{}: dropping buffer {:?}", + guest_username, buffer.read(cx).file().unwrap().full_path(cx) ); client.buffers.remove(&buffer); @@ -5911,8 +5970,8 @@ mod tests { 10..=19 => { let completions = project.update(cx, |project, cx| { log::info!( - "Guest {}: requesting completions for buffer {} ({:?})", - guest_id, + "{}: requesting completions for buffer {} ({:?})", + guest_username, buffer.read(cx).remote_id(), buffer.read(cx).file().unwrap().full_path(cx) ); @@ -5925,7 +5984,7 @@ mod tests { .map_err(|err| anyhow!("completions request failed: {:?}", err)) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching completions request", guest_id); + log::info!("{}: detaching completions request", guest_username); cx.update(|cx| completions.detach_and_log_err(cx)); } else { completions.await?; @@ -5934,8 +5993,8 @@ mod tests { 20..=29 => { let code_actions = project.update(cx, |project, cx| { log::info!( - "Guest {}: requesting code actions for buffer {} ({:?})", - guest_id, + "{}: requesting code actions for buffer {} ({:?})", + guest_username, buffer.read(cx).remote_id(), buffer.read(cx).file().unwrap().full_path(cx) ); @@ -5948,7 +6007,7 @@ mod tests { }) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching code actions request", guest_id); + log::info!("{}: detaching code actions request", guest_username); cx.update(|cx| code_actions.detach_and_log_err(cx)); } else { code_actions.await?; @@ -5957,8 +6016,8 @@ mod tests { 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { let (requested_version, save) = buffer.update(cx, |buffer, cx| { log::info!( - "Guest {}: saving buffer {} ({:?})", - guest_id, + "{}: saving buffer {} ({:?})", + guest_username, buffer.remote_id(), buffer.file().unwrap().full_path(cx) ); @@ -5972,7 +6031,7 @@ mod tests { Ok::<_, anyhow::Error>(()) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching save request", guest_id); + log::info!("{}: detaching save request", guest_username); cx.update(|cx| save.detach_and_log_err(cx)); } else { save.await?; @@ -5981,8 +6040,8 @@ mod tests { 40..=44 => { let prepare_rename = project.update(cx, |project, cx| { log::info!( - "Guest {}: preparing rename for buffer {} ({:?})", - guest_id, + "{}: preparing rename for buffer {} ({:?})", + guest_username, buffer.read(cx).remote_id(), buffer.read(cx).file().unwrap().full_path(cx) ); @@ -5995,7 +6054,7 @@ mod tests { }) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching prepare rename request", guest_id); + log::info!("{}: detaching prepare rename request", guest_username); cx.update(|cx| prepare_rename.detach_and_log_err(cx)); } else { prepare_rename.await?; @@ -6004,8 +6063,8 @@ mod tests { 45..=49 => { let definitions = project.update(cx, |project, cx| { log::info!( - "Guest {}: requesting definitions for buffer {} ({:?})", - guest_id, + "{}: requesting definitions for buffer {} ({:?})", + guest_username, buffer.read(cx).remote_id(), buffer.read(cx).file().unwrap().full_path(cx) ); @@ -6018,7 +6077,7 @@ mod tests { .map_err(|err| anyhow!("definitions request failed: {:?}", err)) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching definitions request", guest_id); + log::info!("{}: detaching definitions request", guest_username); cx.update(|cx| definitions.detach_and_log_err(cx)); } else { client @@ -6029,8 +6088,8 @@ mod tests { 50..=54 => { let highlights = project.update(cx, |project, cx| { log::info!( - "Guest {}: requesting highlights for buffer {} ({:?})", - guest_id, + "{}: requesting highlights for buffer {} ({:?})", + guest_username, buffer.read(cx).remote_id(), buffer.read(cx).file().unwrap().full_path(cx) ); @@ -6043,7 +6102,7 @@ mod tests { .map_err(|err| anyhow!("highlights request failed: {:?}", err)) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching highlights request", guest_id); + log::info!("{}: detaching highlights request", guest_username); cx.update(|cx| highlights.detach_and_log_err(cx)); } else { highlights.await?; @@ -6052,7 +6111,7 @@ mod tests { 55..=59 => { let search = project.update(cx, |project, cx| { let query = rng.lock().gen_range('a'..='z'); - log::info!("Guest {}: project-wide search {:?}", guest_id, query); + log::info!("{}: project-wide search {:?}", guest_username, query); project.search(SearchQuery::text(query, false, false), cx) }); let search = cx.background().spawn(async move { @@ -6061,7 +6120,7 @@ mod tests { .map_err(|err| anyhow!("search request failed: {:?}", err)) }); if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching search request", guest_id); + log::info!("{}: detaching search request", guest_username); cx.update(|cx| search.detach_and_log_err(cx)); } else { client.buffers.extend(search.await?.into_keys()); @@ -6070,12 +6129,16 @@ mod tests { _ => { buffer.update(cx, |buffer, cx| { log::info!( - "Guest {}: updating buffer {} ({:?})", - guest_id, + "{}: updating buffer {} ({:?})", + guest_username, buffer.remote_id(), buffer.file().unwrap().full_path(cx) ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) + if rng.lock().gen_bool(0.7) { + buffer.randomly_edit(&mut *rng.lock(), 5, cx); + } else { + buffer.randomly_undo_redo(&mut *rng.lock(), cx); + } }); } } @@ -6084,28 +6147,19 @@ mod tests { Ok(()) } - match simulate_guest_internal( + let result = simulate_guest_internal( &mut self, - guest_id, + &guest_username, project.clone(), op_start_signal, rng, &mut cx, ) - .await - { - Ok(()) => log::info!("guest {} done", guest_id), - Err(err) => { - if host_disconnected.load(SeqCst) { - log::error!("guest {} simulation error - {:?}", guest_id, err); - } else { - panic!("guest {} simulation error - {:?}", guest_id, err); - } - } - } + .await; + log::info!("{}: done", guest_username); self.project = Some(project); - (self, cx) + (self, cx, result.err()) } } From 273ee0ae58032d400b336d8e4f1344c27527a923 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 16:14:41 +0200 Subject: [PATCH 05/11] Acquire guest connection ids after save request has been forwarded This fixes a bug that would cause the server to broadcast the save message to guests that have potentially left the project. --- crates/server/src/rpc.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index c63f57f42c1735d2273d3ab0fc494c1e3e31d5e6..2fe0931c4ca8c6e97bfe1704e763a7f6d643e786 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -596,20 +596,19 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> tide::Result { - let host; - let mut guests; - { - let state = self.state(); - let project = state.read_project(request.payload.project_id, request.sender_id)?; - host = project.host_connection_id; - guests = project.guest_connection_ids() - } - + let host = self + .state() + .read_project(request.payload.project_id, request.sender_id)? + .host_connection_id; let response = self .peer .forward_request(request.sender_id, host, request.payload.clone()) .await?; + let mut guests = self + .state() + .read_project(request.payload.project_id, request.sender_id)? + .connection_ids(); guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id); broadcast(host, guests, |conn_id| { self.peer.forward_send(host, conn_id, response.clone()) From 9a8b0388fa0dd2f1f0bfd37685dccfed35f97012 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 17:38:17 +0200 Subject: [PATCH 06/11] Replace synchronous `Store` lock with an async lock This also fixes some failures due to `broadcast` and `update_contacts_for_users` being fallible. As part of this commit, these two functions don't return `Result` anymore: the reason for this change is that we don't want a request to fail only because a peer disconnected while we were trying to broadcast a message to them. --- crates/server/Cargo.toml | 1 + crates/server/src/rpc.rs | 256 ++++++++++++++++++++++----------- crates/server/src/rpc/store.rs | 25 +--- 3 files changed, 178 insertions(+), 104 deletions(-) diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e0834b76c5ecb1c518e918a70af78824685e6f42..7c9bb8078597f1bbf37dcb2312d19eb594d94cf2 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -15,6 +15,7 @@ required-features = ["seed-support"] [dependencies] collections = { path = "../collections" } rpc = { path = "../rpc" } +util = { path = "../util" } anyhow = "1.0.40" async-io = "1.3" async-std = { version = "1.8.0", features = ["attributes"] } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2fe0931c4ca8c6e97bfe1704e763a7f6d643e786..0ffcde9176890ad2b541772f9a828f049823b15c 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -7,12 +7,14 @@ use super::{ }; use anyhow::anyhow; use async_io::Timer; -use async_std::task; +use async_std::{ + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + task, +}; use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; use collections::{HashMap, HashSet}; use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt}; use log::{as_debug, as_display}; -use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use rpc::{ proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}, Connection, ConnectionId, Peer, TypedEnvelope, @@ -21,6 +23,9 @@ use sha1::{Digest as _, Sha1}; use std::{ any::TypeId, future::Future, + marker::PhantomData, + ops::{Deref, DerefMut}, + rc::Rc, sync::Arc, time::{Duration, Instant}, }; @@ -31,6 +36,7 @@ use tide::{ Request, Response, }; use time::OffsetDateTime; +use util::ResultExt; type MessageHandler = Box< dyn Send @@ -58,6 +64,16 @@ pub struct RealExecutor; const MESSAGE_COUNT_PER_PAGE: usize = 100; const MAX_MESSAGE_LEN: usize = 1024; +struct StoreReadGuard<'a> { + guard: RwLockReadGuard<'a, Store>, + _not_send: PhantomData>, +} + +struct StoreWriteGuard<'a> { + guard: RwLockWriteGuard<'a, Store>, + _not_send: PhantomData>, +} + impl Server { pub fn new( app_state: Arc, @@ -197,10 +213,10 @@ impl Server { let _ = send_connection_id.send(connection_id).await; } - this.state_mut().add_connection(connection_id, user_id); - if let Err(err) = this.update_contacts_for_users(&[user_id]) { - log::error!("error updating contacts for {:?}: {}", user_id, err); - } + this.state_mut() + .await + .add_connection(connection_id, user_id); + this.update_contacts_for_users(&[user_id]).await; let handle_io = handle_io.fuse(); futures::pin_mut!(handle_io); @@ -257,7 +273,7 @@ impl Server { async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> tide::Result<()> { self.peer.disconnect(connection_id); - let removed_connection = self.state_mut().remove_connection(connection_id)?; + let removed_connection = self.state_mut().await.remove_connection(connection_id)?; for (project_id, project) in removed_connection.hosted_projects { if let Some(share) = project.share { @@ -268,7 +284,7 @@ impl Server { self.peer .send(conn_id, proto::UnshareProject { project_id }) }, - )?; + ); } } @@ -281,10 +297,11 @@ impl Server { peer_id: connection_id.0, }, ) - })?; + }); } - self.update_contacts_for_users(removed_connection.contact_ids.iter())?; + self.update_contacts_for_users(removed_connection.contact_ids.iter()) + .await; Ok(()) } @@ -297,7 +314,7 @@ impl Server { request: TypedEnvelope, ) -> tide::Result { let project_id = { - let mut state = self.state_mut(); + let mut state = self.state_mut().await; let user_id = state.user_id_for_connection(request.sender_id)?; state.register_project(request.sender_id, user_id) }; @@ -310,8 +327,10 @@ impl Server { ) -> tide::Result<()> { let project = self .state_mut() + .await .unregister_project(request.payload.project_id, request.sender_id)?; - self.update_contacts_for_users(project.authorized_user_ids().iter())?; + self.update_contacts_for_users(project.authorized_user_ids().iter()) + .await; Ok(()) } @@ -320,6 +339,7 @@ impl Server { request: TypedEnvelope, ) -> tide::Result { self.state_mut() + .await .share_project(request.payload.project_id, request.sender_id); Ok(proto::Ack {}) } @@ -331,13 +351,15 @@ impl Server { let project_id = request.payload.project_id; let project = self .state_mut() + .await .unshare_project(project_id, request.sender_id)?; broadcast(request.sender_id, project.connection_ids, |conn_id| { self.peer .send(conn_id, proto::UnshareProject { project_id }) - })?; - self.update_contacts_for_users(&project.authorized_user_ids)?; + }); + self.update_contacts_for_users(&project.authorized_user_ids) + .await; Ok(()) } @@ -347,9 +369,13 @@ impl Server { ) -> tide::Result { let project_id = request.payload.project_id; - let user_id = self.state().user_id_for_connection(request.sender_id)?; + let user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let (response, connection_ids, contact_user_ids) = self .state_mut() + .await .join_project(request.sender_id, user_id, project_id) .and_then(|joined| { let share = joined.project.share()?; @@ -410,8 +436,8 @@ impl Server { }), }, ) - })?; - self.update_contacts_for_users(&contact_user_ids)?; + }); + self.update_contacts_for_users(&contact_user_ids).await; Ok(response) } @@ -421,7 +447,10 @@ impl Server { ) -> tide::Result<()> { let sender_id = request.sender_id; let project_id = request.payload.project_id; - let worktree = self.state_mut().leave_project(sender_id, project_id)?; + let worktree = self + .state_mut() + .await + .leave_project(sender_id, project_id)?; broadcast(sender_id, worktree.connection_ids, |conn_id| { self.peer.send( @@ -431,8 +460,9 @@ impl Server { peer_id: sender_id.0, }, ) - })?; - self.update_contacts_for_users(&worktree.authorized_user_ids)?; + }); + self.update_contacts_for_users(&worktree.authorized_user_ids) + .await; Ok(()) } @@ -441,7 +471,10 @@ impl Server { mut self: Arc, request: TypedEnvelope, ) -> tide::Result { - let host_user_id = self.state().user_id_for_connection(request.sender_id)?; + let host_user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let mut contact_user_ids = HashSet::default(); contact_user_ids.insert(host_user_id); @@ -453,7 +486,7 @@ impl Server { let contact_user_ids = contact_user_ids.into_iter().collect::>(); let guest_connection_ids; { - let mut state = self.state_mut(); + let mut state = self.state_mut().await; guest_connection_ids = state .read_project(request.payload.project_id, request.sender_id)? .guest_connection_ids(); @@ -471,8 +504,8 @@ impl Server { broadcast(request.sender_id, guest_connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; - self.update_contacts_for_users(&contact_user_ids)?; + }); + self.update_contacts_for_users(&contact_user_ids).await; Ok(proto::Ack {}) } @@ -482,9 +515,11 @@ impl Server { ) -> tide::Result<()> { let project_id = request.payload.project_id; let worktree_id = request.payload.worktree_id; - let (worktree, guest_connection_ids) = - self.state_mut() - .unregister_worktree(project_id, worktree_id, request.sender_id)?; + let (worktree, guest_connection_ids) = self.state_mut().await.unregister_worktree( + project_id, + worktree_id, + request.sender_id, + )?; broadcast(request.sender_id, guest_connection_ids, |conn_id| { self.peer.send( conn_id, @@ -493,8 +528,9 @@ impl Server { worktree_id, }, ) - })?; - self.update_contacts_for_users(&worktree.authorized_user_ids)?; + }); + self.update_contacts_for_users(&worktree.authorized_user_ids) + .await; Ok(()) } @@ -502,7 +538,7 @@ impl Server { mut self: Arc, request: TypedEnvelope, ) -> tide::Result { - let connection_ids = self.state_mut().update_worktree( + let connection_ids = self.state_mut().await.update_worktree( request.sender_id, request.payload.project_id, request.payload.worktree_id, @@ -513,7 +549,7 @@ impl Server { broadcast(request.sender_id, connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(proto::Ack {}) } @@ -527,7 +563,7 @@ impl Server { .summary .clone() .ok_or_else(|| anyhow!("invalid summary"))?; - let receiver_ids = self.state_mut().update_diagnostic_summary( + let receiver_ids = self.state_mut().await.update_diagnostic_summary( request.payload.project_id, request.payload.worktree_id, request.sender_id, @@ -537,7 +573,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -545,7 +581,7 @@ impl Server { mut self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { - let receiver_ids = self.state_mut().start_language_server( + let receiver_ids = self.state_mut().await.start_language_server( request.payload.project_id, request.sender_id, request @@ -557,7 +593,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -567,11 +603,12 @@ impl Server { ) -> tide::Result<()> { let receiver_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -584,6 +621,7 @@ impl Server { { let host_connection_id = self .state() + .await .read_project(request.payload.remote_entity_id(), request.sender_id)? .host_connection_id; Ok(self @@ -598,6 +636,7 @@ impl Server { ) -> tide::Result { let host = self .state() + .await .read_project(request.payload.project_id, request.sender_id)? .host_connection_id; let response = self @@ -607,12 +646,13 @@ impl Server { let mut guests = self .state() + .await .read_project(request.payload.project_id, request.sender_id)? .connection_ids(); guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id); broadcast(host, guests, |conn_id| { self.peer.forward_send(host, conn_id, response.clone()) - })?; + }); Ok(response) } @@ -623,11 +663,12 @@ impl Server { ) -> tide::Result { let receiver_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(proto::Ack {}) } @@ -637,11 +678,12 @@ impl Server { ) -> tide::Result<()> { let receiver_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -651,11 +693,12 @@ impl Server { ) -> tide::Result<()> { let receiver_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -665,11 +708,12 @@ impl Server { ) -> tide::Result<()> { let receiver_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - })?; + }); Ok(()) } @@ -681,6 +725,7 @@ impl Server { let follower_id = request.sender_id; if !self .state() + .await .project_connection_ids(request.payload.project_id, follower_id)? .contains(&leader_id) { @@ -703,6 +748,7 @@ impl Server { let leader_id = ConnectionId(request.payload.leader_id); if !self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)? .contains(&leader_id) { @@ -719,6 +765,7 @@ impl Server { ) -> tide::Result<()> { let connection_ids = self .state() + .await .project_connection_ids(request.payload.project_id, request.sender_id)?; let leader_id = request .payload @@ -743,7 +790,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> tide::Result { - let user_id = self.state().user_id_for_connection(request.sender_id)?; + let user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let channels = self.app_state.db.get_accessible_channels(user_id).await?; Ok(proto::GetChannelsResponse { channels: channels @@ -781,33 +831,34 @@ impl Server { Ok(proto::GetUsersResponse { users }) } - fn update_contacts_for_users<'a>( + async fn update_contacts_for_users<'a>( self: &Arc, user_ids: impl IntoIterator, - ) -> anyhow::Result<()> { - let mut result = Ok(()); - let state = self.state(); + ) { + let state = self.state().await; for user_id in user_ids { let contacts = state.contacts_for_user(*user_id); for connection_id in state.connection_ids_for_user(*user_id) { - if let Err(error) = self.peer.send( - connection_id, - proto::UpdateContacts { - contacts: contacts.clone(), - }, - ) { - result = Err(error); - } + self.peer + .send( + connection_id, + proto::UpdateContacts { + contacts: contacts.clone(), + }, + ) + .log_err(); } } - result } async fn join_channel( mut self: Arc, request: TypedEnvelope, ) -> tide::Result { - let user_id = self.state().user_id_for_connection(request.sender_id)?; + let user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let channel_id = ChannelId::from_proto(request.payload.channel_id); if !self .app_state @@ -818,7 +869,9 @@ impl Server { Err(anyhow!("access denied"))?; } - self.state_mut().join_channel(request.sender_id, channel_id); + self.state_mut() + .await + .join_channel(request.sender_id, channel_id); let messages = self .app_state .db @@ -843,7 +896,10 @@ impl Server { mut self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { - let user_id = self.state().user_id_for_connection(request.sender_id)?; + let user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let channel_id = ChannelId::from_proto(request.payload.channel_id); if !self .app_state @@ -855,6 +911,7 @@ impl Server { } self.state_mut() + .await .leave_channel(request.sender_id, channel_id); Ok(()) @@ -868,7 +925,7 @@ impl Server { let user_id; let connection_ids; { - let state = self.state(); + let state = self.state().await; user_id = state.user_id_for_connection(request.sender_id)?; connection_ids = state.channel_connection_ids(channel_id)?; } @@ -909,7 +966,7 @@ impl Server { message: Some(message.clone()), }, ) - })?; + }); Ok(proto::SendChannelMessageResponse { message: Some(message), }) @@ -919,7 +976,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> tide::Result { - let user_id = self.state().user_id_for_connection(request.sender_id)?; + let user_id = self + .state() + .await + .user_id_for_connection(request.sender_id)?; let channel_id = ChannelId::from_proto(request.payload.channel_id); if !self .app_state @@ -955,12 +1015,57 @@ impl Server { }) } - fn state<'a>(self: &'a Arc) -> RwLockReadGuard<'a, Store> { - self.store.read() + async fn state<'a>(self: &'a Arc) -> StoreReadGuard<'a> { + #[cfg(test)] + async_std::task::yield_now().await; + let guard = self.store.read().await; + #[cfg(test)] + async_std::task::yield_now().await; + StoreReadGuard { + guard, + _not_send: PhantomData, + } + } + + async fn state_mut<'a>(self: &'a mut Arc) -> StoreWriteGuard<'a> { + #[cfg(test)] + async_std::task::yield_now().await; + let guard = self.store.write().await; + #[cfg(test)] + async_std::task::yield_now().await; + StoreWriteGuard { + guard, + _not_send: PhantomData, + } + } +} + +impl<'a> Deref for StoreReadGuard<'a> { + type Target = Store; + + fn deref(&self) -> &Self::Target { + &*self.guard + } +} + +impl<'a> Deref for StoreWriteGuard<'a> { + type Target = Store; + + fn deref(&self) -> &Self::Target { + &*self.guard } +} - fn state_mut<'a>(self: &'a mut Arc) -> RwLockWriteGuard<'a, Store> { - self.store.write() +impl<'a> DerefMut for StoreWriteGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut *self.guard + } +} + +impl<'a> Drop for StoreWriteGuard<'a> { + fn drop(&mut self) { + #[cfg(test)] + self.check_invariants(); } } @@ -976,25 +1081,15 @@ impl Executor for RealExecutor { } } -fn broadcast( - sender_id: ConnectionId, - receiver_ids: Vec, - mut f: F, -) -> anyhow::Result<()> +fn broadcast(sender_id: ConnectionId, receiver_ids: Vec, mut f: F) where F: FnMut(ConnectionId) -> anyhow::Result<()>, { - let mut result = Ok(()); for receiver_id in receiver_ids { if receiver_id != sender_id { - if let Err(error) = f(receiver_id) { - if result.is_ok() { - result = Err(error); - } - } + f(receiver_id).log_err(); } } - result } pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { @@ -5216,6 +5311,7 @@ mod tests { let contacts = server .store .read() + .await .contacts_for_user(guest.current_user_id(&guest_cx)); assert!(!contacts .iter() @@ -5292,7 +5388,7 @@ mod tests { .unwrap() .read_with(&guest_cx, |project, _| assert!(project.is_read_only())); for user_id in &user_ids { - for contact in server.store.read().contacts_for_user(*user_id) { + for contact in server.store.read().await.contacts_for_user(*user_id) { assert_ne!( contact.user_id, removed_guest_id.0 as u64, "removed guest is still a contact of another peer" @@ -5590,7 +5686,7 @@ mod tests { } async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> { - self.server.store.read() + self.server.store.read().await } async fn condition(&mut self, mut predicate: F) @@ -5598,7 +5694,7 @@ mod tests { F: FnMut(&Store) -> bool, { async_std::future::timeout(Duration::from_millis(500), async { - while !(predicate)(&*self.server.store.read()) { + while !(predicate)(&*self.server.store.read().await) { self.foreground.start_waiting(); self.notifications.next().await; self.foreground.finish_waiting(); diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 6c330c9c8bae3e3558280ea940fc180207ce5c70..33d2a399816ad90f6e809373e8ece3ca67c8046e 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -130,9 +130,6 @@ impl Store { } } - #[cfg(test)] - self.check_invariants(); - Ok(result) } @@ -275,8 +272,6 @@ impl Store { share.worktrees.insert(worktree_id, Default::default()); } - #[cfg(test)] - self.check_invariants(); Ok(()) } else { Err(anyhow!("no such project"))? @@ -313,8 +308,6 @@ impl Store { } } - #[cfg(test)] - self.check_invariants(); Ok(project) } else { Err(anyhow!("no such project"))? @@ -359,9 +352,6 @@ impl Store { } } - #[cfg(test)] - self.check_invariants(); - Ok((worktree, guest_connection_ids)) } @@ -403,9 +393,6 @@ impl Store { } } - #[cfg(test)] - self.check_invariants(); - Ok(UnsharedProject { connection_ids, authorized_user_ids, @@ -491,9 +478,6 @@ impl Store { share.active_replica_ids.insert(replica_id); share.guests.insert(connection_id, (replica_id, user_id)); - #[cfg(test)] - self.check_invariants(); - Ok(JoinedProject { replica_id, project: &self.projects[&project_id], @@ -526,9 +510,6 @@ impl Store { let connection_ids = project.connection_ids(); let authorized_user_ids = project.authorized_user_ids(); - #[cfg(test)] - self.check_invariants(); - Ok(LeftProject { connection_ids, authorized_user_ids, @@ -556,10 +537,6 @@ impl Store { worktree.entries.insert(entry.id, entry.clone()); } let connection_ids = project.connection_ids(); - - #[cfg(test)] - self.check_invariants(); - Ok(connection_ids) } @@ -633,7 +610,7 @@ impl Store { } #[cfg(test)] - fn check_invariants(&self) { + pub fn check_invariants(&self) { for (connection_id, connection) in &self.connections { for project_id in &connection.projects { let project = &self.projects.get(&project_id).unwrap(); From 3844634765a64fc2655ea89dec625fba436f9978 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 11 Apr 2022 19:30:52 +0200 Subject: [PATCH 07/11] Hold the state lock while responding to guest joining a project Co-Authored-By: Nathan Sobo --- crates/server/src/rpc.rs | 74 ++++++++++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 0ffcde9176890ad2b541772f9a828f049823b15c..1c4c76349da952cf6a55e9f21c4405dc260a1423 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -94,7 +94,7 @@ impl Server { .add_message_handler(Server::unregister_project) .add_request_handler(Server::share_project) .add_message_handler(Server::unshare_project) - .add_request_handler(Server::join_project) + .add_sync_request_handler(Server::join_project) .add_message_handler(Server::leave_project) .add_request_handler(Server::register_worktree) .add_message_handler(Server::unregister_worktree) @@ -186,6 +186,42 @@ impl Server { }) } + /// Handle a request while holding a lock to the store. This is useful when we're registering + /// a connection but we want to respond on the connection before anybody else can send on it. + fn add_sync_request_handler(&mut self, handler: F) -> &mut Self + where + F: 'static + + Send + + Sync + + Fn(Arc, &mut Store, TypedEnvelope) -> tide::Result, + M: RequestMessage, + { + let handler = Arc::new(handler); + self.add_message_handler(move |server, envelope| { + let receipt = envelope.receipt(); + let handler = handler.clone(); + async move { + let mut store = server.store.write().await; + let response = (handler)(server.clone(), &mut *store, envelope); + match response { + Ok(response) => { + server.peer.respond(receipt, response)?; + Ok(()) + } + Err(error) => { + server.peer.respond_with_error( + receipt, + proto::Error { + message: error.to_string(), + }, + )?; + Err(error) + } + } + } + }) + } + pub fn handle_connection( self: &Arc, connection: Connection, @@ -363,19 +399,15 @@ impl Server { Ok(()) } - async fn join_project( - mut self: Arc, + fn join_project( + self: Arc, + state: &mut Store, request: TypedEnvelope, ) -> tide::Result { let project_id = request.payload.project_id; - let user_id = self - .state() - .await - .user_id_for_connection(request.sender_id)?; - let (response, connection_ids, contact_user_ids) = self - .state_mut() - .await + let user_id = state.user_id_for_connection(request.sender_id)?; + let (response, connection_ids, contact_user_ids) = state .join_project(request.sender_id, user_id, project_id) .and_then(|joined| { let share = joined.project.share()?; @@ -437,7 +469,7 @@ impl Server { }, ) }); - self.update_contacts_for_users(&contact_user_ids).await; + self.update_contacts_for_users_sync(state, &contact_user_ids); Ok(response) } @@ -851,6 +883,26 @@ impl Server { } } + fn update_contacts_for_users_sync<'a>( + self: &Arc, + state: &Store, + user_ids: impl IntoIterator, + ) { + for user_id in user_ids { + let contacts = state.contacts_for_user(*user_id); + for connection_id in state.connection_ids_for_user(*user_id) { + self.peer + .send( + connection_id, + proto::UpdateContacts { + contacts: contacts.clone(), + }, + ) + .log_err(); + } + } + } + async fn join_channel( mut self: Arc, request: TypedEnvelope, From c06e5f3d1e6d23af23b826af9ea9b411d6b60967 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 12 Apr 2022 09:43:36 +0200 Subject: [PATCH 08/11] Limit incoming size to 1 in tests to more easily simulate backpressure --- crates/rpc/src/peer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 76fd6aac185fa0d941c02285b377f4c93d8693ad..a2b88f795c69396770e1775473aef68d3e61f339 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -126,7 +126,11 @@ impl Peer { // can always send messages without yielding. For incoming messages, use a // bounded channel so that other peers will receive backpressure if they send // messages faster than this peer can process them. - let (mut incoming_tx, incoming_rx) = mpsc::channel(64); + #[cfg(any(test, feature = "test-support"))] + const INCOMING_BUFFER_SIZE: usize = 1; + #[cfg(not(any(test, feature = "test-support")))] + const INCOMING_BUFFER_SIZE: usize = 64; + let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE); let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); From 56b9e5b0a0a13ec7db65737cc2fff5a5e691cfb1 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 12 Apr 2022 09:44:23 +0200 Subject: [PATCH 09/11] Make `Server::update_contacts_for_users` always synchronous --- crates/server/src/rpc.rs | 149 +++++++++++++++------------------------ 1 file changed, 55 insertions(+), 94 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 1c4c76349da952cf6a55e9f21c4405dc260a1423..0b4551001196e0b2549cfbcdb1c718f20f67f8d1 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -249,10 +249,11 @@ impl Server { let _ = send_connection_id.send(connection_id).await; } - this.state_mut() - .await - .add_connection(connection_id, user_id); - this.update_contacts_for_users(&[user_id]).await; + { + let mut state = this.state_mut().await; + state.add_connection(connection_id, user_id); + this.update_contacts_for_users(&*state, &[user_id]); + } let handle_io = handle_io.fuse(); futures::pin_mut!(handle_io); @@ -309,7 +310,8 @@ impl Server { async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> tide::Result<()> { self.peer.disconnect(connection_id); - let removed_connection = self.state_mut().await.remove_connection(connection_id)?; + let mut state = self.state_mut().await; + let removed_connection = state.remove_connection(connection_id)?; for (project_id, project) in removed_connection.hosted_projects { if let Some(share) = project.share { @@ -336,8 +338,7 @@ impl Server { }); } - self.update_contacts_for_users(removed_connection.contact_ids.iter()) - .await; + self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter()); Ok(()) } @@ -346,7 +347,7 @@ impl Server { } async fn register_project( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result { let project_id = { @@ -358,20 +359,17 @@ impl Server { } async fn unregister_project( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { - let project = self - .state_mut() - .await - .unregister_project(request.payload.project_id, request.sender_id)?; - self.update_contacts_for_users(project.authorized_user_ids().iter()) - .await; + let mut state = self.state_mut().await; + let project = state.unregister_project(request.payload.project_id, request.sender_id)?; + self.update_contacts_for_users(&*state, &project.authorized_user_ids()); Ok(()) } async fn share_project( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result { self.state_mut() @@ -381,21 +379,17 @@ impl Server { } async fn unshare_project( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let project_id = request.payload.project_id; - let project = self - .state_mut() - .await - .unshare_project(project_id, request.sender_id)?; - + let mut state = self.state_mut().await; + let project = state.unshare_project(project_id, request.sender_id)?; broadcast(request.sender_id, project.connection_ids, |conn_id| { self.peer .send(conn_id, proto::UnshareProject { project_id }) }); - self.update_contacts_for_users(&project.authorized_user_ids) - .await; + self.update_contacts_for_users(&mut *state, &project.authorized_user_ids); Ok(()) } @@ -469,21 +463,18 @@ impl Server { }, ) }); - self.update_contacts_for_users_sync(state, &contact_user_ids); + self.update_contacts_for_users(state, &contact_user_ids); Ok(response) } async fn leave_project( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let sender_id = request.sender_id; let project_id = request.payload.project_id; - let worktree = self - .state_mut() - .await - .leave_project(sender_id, project_id)?; - + let mut state = self.state_mut().await; + let worktree = state.leave_project(sender_id, project_id)?; broadcast(sender_id, worktree.connection_ids, |conn_id| { self.peer.send( conn_id, @@ -493,65 +484,56 @@ impl Server { }, ) }); - self.update_contacts_for_users(&worktree.authorized_user_ids) - .await; - + self.update_contacts_for_users(&*state, &worktree.authorized_user_ids); Ok(()) } async fn register_worktree( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result { - let host_user_id = self - .state() - .await - .user_id_for_connection(request.sender_id)?; - let mut contact_user_ids = HashSet::default(); - contact_user_ids.insert(host_user_id); for github_login in &request.payload.authorized_logins { let contact_user_id = self.app_state.db.create_user(github_login, false).await?; contact_user_ids.insert(contact_user_id); } + let mut state = self.state_mut().await; + let host_user_id = state.user_id_for_connection(request.sender_id)?; + contact_user_ids.insert(host_user_id); + let contact_user_ids = contact_user_ids.into_iter().collect::>(); - let guest_connection_ids; - { - let mut state = self.state_mut().await; - guest_connection_ids = state - .read_project(request.payload.project_id, request.sender_id)? - .guest_connection_ids(); - state.register_worktree( - request.payload.project_id, - request.payload.worktree_id, - request.sender_id, - Worktree { - authorized_user_ids: contact_user_ids.clone(), - root_name: request.payload.root_name.clone(), - visible: request.payload.visible, - }, - )?; - } + let guest_connection_ids = state + .read_project(request.payload.project_id, request.sender_id)? + .guest_connection_ids(); + state.register_worktree( + request.payload.project_id, + request.payload.worktree_id, + request.sender_id, + Worktree { + authorized_user_ids: contact_user_ids.clone(), + root_name: request.payload.root_name.clone(), + visible: request.payload.visible, + }, + )?; + broadcast(request.sender_id, guest_connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) }); - self.update_contacts_for_users(&contact_user_ids).await; + self.update_contacts_for_users(&*state, &contact_user_ids); Ok(proto::Ack {}) } async fn unregister_worktree( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let project_id = request.payload.project_id; let worktree_id = request.payload.worktree_id; - let (worktree, guest_connection_ids) = self.state_mut().await.unregister_worktree( - project_id, - worktree_id, - request.sender_id, - )?; + let mut state = self.state_mut().await; + let (worktree, guest_connection_ids) = + state.unregister_worktree(project_id, worktree_id, request.sender_id)?; broadcast(request.sender_id, guest_connection_ids, |conn_id| { self.peer.send( conn_id, @@ -561,13 +543,12 @@ impl Server { }, ) }); - self.update_contacts_for_users(&worktree.authorized_user_ids) - .await; + self.update_contacts_for_users(&*state, &worktree.authorized_user_ids); Ok(()) } async fn update_worktree( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result { let connection_ids = self.state_mut().await.update_worktree( @@ -587,7 +568,7 @@ impl Server { } async fn update_diagnostic_summary( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let summary = request @@ -610,7 +591,7 @@ impl Server { } async fn start_language_server( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let receiver_ids = self.state_mut().await.start_language_server( @@ -863,27 +844,7 @@ impl Server { Ok(proto::GetUsersResponse { users }) } - async fn update_contacts_for_users<'a>( - self: &Arc, - user_ids: impl IntoIterator, - ) { - let state = self.state().await; - for user_id in user_ids { - let contacts = state.contacts_for_user(*user_id); - for connection_id in state.connection_ids_for_user(*user_id) { - self.peer - .send( - connection_id, - proto::UpdateContacts { - contacts: contacts.clone(), - }, - ) - .log_err(); - } - } - } - - fn update_contacts_for_users_sync<'a>( + fn update_contacts_for_users<'a>( self: &Arc, state: &Store, user_ids: impl IntoIterator, @@ -904,7 +865,7 @@ impl Server { } async fn join_channel( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result { let user_id = self @@ -945,7 +906,7 @@ impl Server { } async fn leave_channel( - mut self: Arc, + self: Arc, request: TypedEnvelope, ) -> tide::Result<()> { let user_id = self @@ -1079,7 +1040,7 @@ impl Server { } } - async fn state_mut<'a>(self: &'a mut Arc) -> StoreWriteGuard<'a> { + async fn state_mut<'a>(self: &'a Arc) -> StoreWriteGuard<'a> { #[cfg(test)] async_std::task::yield_now().await; let guard = self.store.write().await; From 1d84876cfd88c7eb7a25edca75c2cf4885eed025 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 12 Apr 2022 10:15:38 +0200 Subject: [PATCH 10/11] Adjust distribution in randomized test --- crates/collab/src/rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 23c3e80e6b7ef4745b238ff23d6abfc4c286529a..be6ba0750f1e7f52d601dcb6726bfed9cbc278d7 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -5390,7 +5390,7 @@ mod tests { log::info!("Added connection for {}", guest_username); operations += 1; } - 20..=30 if clients.len() > 1 => { + 20..=29 if clients.len() > 1 => { log::info!("Removing guest"); let guest_ix = rng.lock().gen_range(1..clients.len()); let removed_guest_id = user_ids.remove(guest_ix); From 71beebc913875247e93ae0d8b4399876f504eb9b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 12 Apr 2022 10:52:16 +0200 Subject: [PATCH 11/11] Fix warning --- crates/collab/src/rpc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index be6ba0750f1e7f52d601dcb6726bfed9cbc278d7..65f4d4ba4e72ceb53c3fd9538a3d86458ed50eb9 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1228,7 +1228,6 @@ mod tests { time::Duration, }; use theme::ThemeRegistry; - use util::TryFutureExt; use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams}; #[cfg(test)]