From 9e173564e9e72ad5de8e9aa3f3df61112d51259e Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 23 Feb 2022 15:45:12 +0100 Subject: [PATCH 01/14] Pass an `AsyncAppContext` to fake language server request handlers --- crates/editor/src/editor.rs | 24 ++++---- crates/language/src/language.rs | 6 +- crates/language/src/tests.rs | 4 +- crates/lsp/src/lsp.rs | 105 +++++++++++++++++--------------- crates/project/src/project.rs | 6 +- crates/server/src/rpc.rs | 85 ++++++++++++++------------ 6 files changed, 123 insertions(+), 107 deletions(-) diff --git a/crates/editor/src/editor.rs b/crates/editor/src/editor.rs index 254f2ee4150609fb9a0faeae6b02031ae78d6582..d3c6beaa9751fefd23b865345caebaf927f0a86e 100644 --- a/crates/editor/src/editor.rs +++ b/crates/editor/src/editor.rs @@ -8143,16 +8143,18 @@ mod tests { #[gpui::test] async fn test_completion(mut cx: gpui::TestAppContext) { let settings = cx.read(EditorSettings::test); - let (language_server, mut fake) = lsp::LanguageServer::fake_with_capabilities( - lsp::ServerCapabilities { - completion_provider: Some(lsp::CompletionOptions { - trigger_characters: Some(vec![".".to_string(), ":".to_string()]), + let (language_server, mut fake) = cx.update(|cx| { + lsp::LanguageServer::fake_with_capabilities( + lsp::ServerCapabilities { + completion_provider: Some(lsp::CompletionOptions { + trigger_characters: Some(vec![".".to_string(), ":".to_string()]), + ..Default::default() + }), ..Default::default() - }), - ..Default::default() - }, - cx.background(), - ); + }, + cx, + ) + }); let text = " one @@ -8318,7 +8320,7 @@ mod tests { position: Point, completions: Vec<(Range, &'static str)>, ) { - fake.handle_request::(move |params| { + fake.handle_request::(move |params, _| { assert_eq!( params.text_document_position.text_document.uri, lsp::Url::from_file_path(path).unwrap() @@ -8352,7 +8354,7 @@ mod tests { fake: &mut FakeLanguageServer, edit: Option<(Range, &'static str)>, ) { - fake.handle_request::(move |_| { + fake.handle_request::(move |_, _| { lsp::CompletionItem { additional_text_edits: edit.clone().map(|(range, new_text)| { vec![lsp::TextEdit::new( diff --git a/crates/language/src/language.rs b/crates/language/src/language.rs index 57a4fe001a7228767e29f261b064451bcc6f8ceb..31edba7131c61776ae9bc4fac7bec3aa00211a65 100644 --- a/crates/language/src/language.rs +++ b/crates/language/src/language.rs @@ -13,7 +13,7 @@ use futures::{ future::{BoxFuture, Shared}, FutureExt, TryFutureExt, }; -use gpui::{AppContext, Task}; +use gpui::{MutableAppContext, Task}; use highlight_map::HighlightMap; use lazy_static::lazy_static; use parking_lot::{Mutex, RwLock}; @@ -225,7 +225,7 @@ impl LanguageRegistry { language: &Arc, root_path: Arc, http_client: Arc, - cx: &AppContext, + cx: &mut MutableAppContext, ) -> Option>>> { #[cfg(any(test, feature = "test-support"))] if let Some(config) = &language.config.language_server { @@ -234,7 +234,7 @@ impl LanguageRegistry { let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities( fake_config.capabilities.clone(), - cx.background().clone(), + cx, ); if let Some(initalizer) = &fake_config.initializer { diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index 9fd5d693ff65de0de342d6c2b3ac86d70d5d188c..eac6c6b5d6be0741ae750768ec315d16790a9a5f 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -554,7 +554,7 @@ fn test_autoindent_adjusts_lines_when_only_text_changes(cx: &mut MutableAppConte #[gpui::test] async fn test_diagnostics(mut cx: gpui::TestAppContext) { - let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background()); + let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake); let mut rust_lang = rust_lang(); rust_lang.config.language_server = Some(LanguageServerConfig { disk_based_diagnostic_sources: HashSet::from_iter(["disk".to_string()]), @@ -837,7 +837,7 @@ async fn test_diagnostics(mut cx: gpui::TestAppContext) { #[gpui::test] async fn test_edits_from_lsp_with_past_version(mut cx: gpui::TestAppContext) { - let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background()); + let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake); let text = " fn a() { diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index 93df14f89dd9b08c849b074d6ddbd00f068de5b0..179a87e43a825afba9bbde96a5979c44b9d80d7f 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context, Result}; use futures::{io::BufWriter, AsyncRead, AsyncWrite}; -use gpui::{executor, Task}; +use gpui::{executor, AsyncAppContext, Task}; use parking_lot::{Mutex, RwLock}; use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch}; use serde::{Deserialize, Serialize}; @@ -483,36 +483,44 @@ impl Drop for Subscription { #[cfg(any(test, feature = "test-support"))] pub struct FakeLanguageServer { - handlers: - Arc Vec>>>>, + handlers: Arc< + Mutex< + HashMap<&'static str, Box Vec>>, + >, + >, outgoing_tx: futures::channel::mpsc::UnboundedSender>, incoming_rx: futures::channel::mpsc::UnboundedReceiver>, } #[cfg(any(test, feature = "test-support"))] impl LanguageServer { - pub fn fake(executor: Arc) -> (Arc, FakeLanguageServer) { - Self::fake_with_capabilities(Default::default(), executor) + pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc, FakeLanguageServer) { + Self::fake_with_capabilities(Default::default(), cx) } pub fn fake_with_capabilities( capabilities: ServerCapabilities, - executor: Arc, + cx: &mut gpui::MutableAppContext, ) -> (Arc, FakeLanguageServer) { let (stdin_writer, stdin_reader) = async_pipe::pipe(); let (stdout_writer, stdout_reader) = async_pipe::pipe(); - let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer); + let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx); fake.handle_request::({ let capabilities = capabilities.clone(); - move |_| InitializeResult { + move |_, _| InitializeResult { capabilities: capabilities.clone(), ..Default::default() } }); - let server = - Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap(); + let server = Self::new_internal( + stdin_writer, + stdout_reader, + Path::new("/"), + cx.background().clone(), + ) + .unwrap(); (server, fake) } @@ -521,9 +529,9 @@ impl LanguageServer { #[cfg(any(test, feature = "test-support"))] impl FakeLanguageServer { fn new( - background: Arc, stdin: async_pipe::PipeReader, stdout: async_pipe::PipeWriter, + cx: &mut gpui::MutableAppContext, ) -> Self { use futures::StreamExt as _; @@ -537,43 +545,42 @@ impl FakeLanguageServer { // Receive incoming messages let handlers = this.handlers.clone(); - let executor = background.clone(); - background - .spawn(async move { - let mut buffer = Vec::new(); - let mut stdin = smol::io::BufReader::new(stdin); - while Self::receive(&mut stdin, &mut buffer).await.is_ok() { - executor.simulate_random_delay().await; - if let Ok(request) = serde_json::from_slice::(&buffer) { - assert_eq!(request.jsonrpc, JSON_RPC_VERSION); - - if let Some(handler) = handlers.lock().get_mut(request.method) { - let response = handler(request.id, request.params.get().as_bytes()); - log::debug!("handled lsp request. method:{}", request.method); - outgoing_tx.unbounded_send(response)?; - } else { - log::debug!("unhandled lsp request. method:{}", request.method); - outgoing_tx.unbounded_send( - serde_json::to_vec(&AnyResponse { - id: request.id, - error: Some(Error { - message: "no handler".to_string(), - }), - result: None, - }) - .unwrap(), - )?; - } + cx.spawn(|cx| async move { + let mut buffer = Vec::new(); + let mut stdin = smol::io::BufReader::new(stdin); + while Self::receive(&mut stdin, &mut buffer).await.is_ok() { + cx.background().simulate_random_delay().await; + if let Ok(request) = serde_json::from_slice::(&buffer) { + assert_eq!(request.jsonrpc, JSON_RPC_VERSION); + + if let Some(handler) = handlers.lock().get_mut(request.method) { + let response = + handler(request.id, request.params.get().as_bytes(), cx.clone()); + log::debug!("handled lsp request. method:{}", request.method); + outgoing_tx.unbounded_send(response)?; } else { - incoming_tx.unbounded_send(buffer.clone())?; + log::debug!("unhandled lsp request. method:{}", request.method); + outgoing_tx.unbounded_send( + serde_json::to_vec(&AnyResponse { + id: request.id, + error: Some(Error { + message: "no handler".to_string(), + }), + result: None, + }) + .unwrap(), + )?; } + } else { + incoming_tx.unbounded_send(buffer.clone())?; } - Ok::<_, anyhow::Error>(()) - }) - .detach(); + } + Ok::<_, anyhow::Error>(()) + }) + .detach(); // Send outgoing messages - background + cx.background() .spawn(async move { let mut stdout = smol::io::BufWriter::new(stdout); while let Some(notification) = outgoing_rx.next().await { @@ -618,13 +625,13 @@ impl FakeLanguageServer { ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, - F: 'static + Send + Sync + FnMut(T::Params) -> T::Result, + F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> T::Result, { let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded(); self.handlers.lock().insert( T::METHOD, - Box::new(move |id, params| { - let result = handler(serde_json::from_slice::(params).unwrap()); + Box::new(move |id, params, cx| { + let result = handler(serde_json::from_slice::(params).unwrap(), cx); let result = serde_json::to_string(&result).unwrap(); let result = serde_json::from_str::<&RawValue>(&result).unwrap(); let response = AnyResponse { @@ -709,8 +716,8 @@ mod tests { } #[gpui::test] - async fn test_fake(cx: TestAppContext) { - let (server, mut fake) = LanguageServer::fake(cx.background()); + async fn test_fake(mut cx: TestAppContext) { + let (server, mut fake) = cx.update(LanguageServer::fake); let (message_tx, message_rx) = channel::unbounded(); let (diagnostics_tx, diagnostics_rx) = channel::unbounded(); @@ -762,7 +769,7 @@ mod tests { "file://b/c" ); - fake.handle_request::(|_| ()); + fake.handle_request::(|_, _| ()); drop(server); fake.receive_notification::().await; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 4e67b11a59f5dec31c266a0b0527dae128a02d71..ee19d4a023544561c76f76f96502e44fc98f2d7a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -3612,7 +3612,7 @@ mod tests { .unwrap(); let mut fake_server = fake_servers.next().await.unwrap(); - fake_server.handle_request::(move |params| { + fake_server.handle_request::(move |params, _| { let params = params.text_document_position_params; assert_eq!( params.text_document.uri.to_file_path().unwrap(), @@ -4504,7 +4504,7 @@ mod tests { project.prepare_rename(buffer.clone(), 7, cx) }); fake_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs"); assert_eq!(params.position, lsp::Position::new(0, 7)); Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( @@ -4523,7 +4523,7 @@ mod tests { project.perform_rename(buffer.clone(), 7, "THREE".to_string(), true, cx) }); fake_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///dir/one.rs" diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index a767676fad553b4c5d42ebc439553afc2b6f4e2a..2e0b0892a17d0c4c5488aac3b708cec7ebf961f8 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -2389,7 +2389,7 @@ mod tests { // Return some completions from the host's language server. cx_a.foreground().start_waiting(); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -2455,23 +2455,28 @@ mod tests { // Return a resolved completion from the host's language server. // The resolved completion has an additional text edit. - fake_language_server.handle_request::(|params| { - assert_eq!(params.label, "first_method(…)"); - lsp::CompletionItem { - label: "first_method(…)".into(), - detail: Some("fn(&mut self, B) -> C".into()), - text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { - new_text: "first_method($1)".to_string(), - range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)), - })), - additional_text_edits: Some(vec![lsp::TextEdit { - new_text: "use d::SomeTrait;\n".to_string(), - range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)), - }]), - insert_text_format: Some(lsp::InsertTextFormat::SNIPPET), - ..Default::default() - } - }); + fake_language_server.handle_request::( + |params, _| { + assert_eq!(params.label, "first_method(…)"); + lsp::CompletionItem { + label: "first_method(…)".into(), + detail: Some("fn(&mut self, B) -> C".into()), + text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { + new_text: "first_method($1)".to_string(), + range: lsp::Range::new( + lsp::Position::new(0, 14), + lsp::Position::new(0, 14), + ), + })), + additional_text_edits: Some(vec![lsp::TextEdit { + new_text: "use d::SomeTrait;\n".to_string(), + range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)), + }]), + insert_text_format: Some(lsp::InsertTextFormat::SNIPPET), + ..Default::default() + } + }, + ); // The additional edit is applied. buffer_a @@ -2568,7 +2573,7 @@ mod tests { }); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(vec![ lsp::TextEdit { range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)), @@ -2677,7 +2682,7 @@ mod tests { let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root-2/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), @@ -2702,7 +2707,7 @@ mod tests { // Try getting more definitions for the same buffer, ensuring the buffer gets reused from // the previous call to `definition`. let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx)); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root-2/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)), @@ -2826,7 +2831,7 @@ mod tests { let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|params| { + fake_language_server.handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///root-1/one.rs" @@ -2954,7 +2959,7 @@ mod tests { let mut fake_language_server = fake_language_servers.next().await.unwrap(); fake_language_server.handle_request::( - |params| { + |params, _| { assert_eq!( params .text_document_position_params @@ -3103,7 +3108,7 @@ mod tests { // Request the definition of a symbol as the guest. let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { #[allow(deprecated)] Some(vec![lsp::SymbolInformation { name: "TWO".into(), @@ -3245,7 +3250,7 @@ mod tests { } let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), @@ -3353,7 +3358,7 @@ mod tests { let mut fake_language_server = fake_language_servers.next().await.unwrap(); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -3372,7 +3377,7 @@ mod tests { }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -3443,7 +3448,7 @@ mod tests { Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx) }) .unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { lsp::CodeAction { title: "Inline into all callers".to_string(), edit: Some(lsp::WorkspaceEdit { @@ -3598,7 +3603,7 @@ mod tests { }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs"); assert_eq!(params.position, lsp::Position::new(0, 7)); Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( @@ -3628,7 +3633,7 @@ mod tests { Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap() }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///dir/one.rs" @@ -4684,7 +4689,7 @@ mod tests { let rng = rng.clone(); let files = files.clone(); move |fake_server| { - fake_server.handle_request::(|_| { + fake_server.handle_request::(|_, _| { Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem { text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { range: lsp::Range::new( @@ -4697,7 +4702,7 @@ mod tests { }])) }); - fake_server.handle_request::(|_| { + fake_server.handle_request::(|_, _| { Some(vec![lsp::CodeActionOrCommand::CodeAction( lsp::CodeAction { title: "the-code-action".to_string(), @@ -4706,17 +4711,19 @@ mod tests { )]) }); - fake_server.handle_request::(|params| { - Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( - params.position, - params.position, - ))) - }); + fake_server.handle_request::( + |params, _| { + Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( + params.position, + params.position, + ))) + }, + ); fake_server.handle_request::({ let files = files.clone(); let rng = rng.clone(); - move |_| { + move |_, _| { let files = files.lock(); let mut rng = rng.lock(); let count = rng.gen_range::(1..3); From f3c6320eeb010387f1b45ad33b9174b09fd45ff6 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 23 Feb 2022 16:16:02 +0100 Subject: [PATCH 02/14] Move document highlights RPC message to the background --- crates/project/src/project.rs | 4 +-- crates/rpc/src/proto.rs | 4 +-- crates/server/src/rpc.rs | 61 ++++++++++++++++++++++++++++++++++- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index ee19d4a023544561c76f76f96502e44fc98f2d7a..8d11dcf4905139f0b89dfd707cdb4cc17d64bb32 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -776,7 +776,7 @@ impl Project { } } - fn get_open_buffer( + pub fn get_open_buffer( &mut self, path: &ProjectPath, cx: &mut ModelContext, @@ -2047,7 +2047,7 @@ impl Project { } } - fn find_local_worktree( + pub fn find_local_worktree( &self, abs_path: &Path, cx: &AppContext, diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index b6d5c4b43815c0b053f3ff142d8b2a563655d3ce..018f4aed0b829db99c2b382cc49eee6554e03b6d 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -157,8 +157,8 @@ messages!( (GetCompletionsResponse, Foreground), (GetDefinition, Foreground), (GetDefinitionResponse, Foreground), - (GetDocumentHighlights, Foreground), - (GetDocumentHighlightsResponse, Foreground), + (GetDocumentHighlights, Background), + (GetDocumentHighlightsResponse, Background), (GetReferences, Foreground), (GetReferencesResponse, Foreground), (GetProjectSymbols, Background), diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2e0b0892a17d0c4c5488aac3b708cec7ebf961f8..287f94bac43bc5c52c679febe32f5677be19436a 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1218,7 +1218,7 @@ mod tests { fs::{FakeFs, Fs as _}, language::{ tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, - LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, + LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition, }, lsp, project::{DiagnosticSummary, Project, ProjectPath}, @@ -4688,6 +4688,7 @@ mod tests { language_server_config.set_fake_initializer({ let rng = rng.clone(); let files = files.clone(); + let project = project.clone(); move |fake_server| { fake_server.handle_request::(|_, _| { Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem { @@ -4740,6 +4741,44 @@ mod tests { )) } }); + + fake_server.handle_request::({ + let rng = rng.clone(); + let project = project.clone(); + move |params, mut cx| { + project.update(&mut cx, |project, cx| { + let path = params + .text_document_position_params + .text_document + .uri + .to_file_path() + .unwrap(); + let (worktree, relative_path) = + project.find_local_worktree(&path, cx)?; + let project_path = + ProjectPath::from((worktree.read(cx).id(), relative_path)); + let buffer = project.get_open_buffer(&project_path, cx)?.read(cx); + + let mut highlights = Vec::new(); + let highlight_count = rng.lock().gen_range(1..=5); + let mut prev_end = 0; + for _ in 0..highlight_count { + let range = + buffer.random_byte_range(prev_end, &mut *rng.lock()); + let start = + buffer.offset_to_point_utf16(range.start).to_lsp_position(); + let end = + buffer.offset_to_point_utf16(range.end).to_lsp_position(); + highlights.push(lsp::DocumentHighlight { + range: lsp::Range::new(start, end), + kind: Some(lsp::DocumentHighlightKind::READ), + }); + prev_end = range.end; + } + Some(highlights) + }) + } + }); } }); @@ -5026,6 +5065,26 @@ mod tests { definitions.await; } } + 50..=55 => { + let highlights = project.update(&mut cx, |project, cx| { + log::info!( + "Guest {}: requesting highlights for buffer {:?}", + guest_id, + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights.await.expect("highlights request failed"); + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching highlights request", guest_id); + highlights.detach(); + } else { + highlights.await; + } + } _ => { buffer.update(&mut cx, |buffer, cx| { log::info!( From 8440644dc9ca9a3ffe663a8c3e8e6b8cb75d73f7 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 23 Feb 2022 18:35:25 +0100 Subject: [PATCH 03/14] Remove update_id from worktree update messages We don't need this anymore because worktree updates are foreground messages. Co-Authored-By: Nathan Sobo Co-Authored-By: Max Brunsfeld --- crates/project/src/project.rs | 1 - crates/project/src/worktree.rs | 56 ++++++---------------------------- crates/rpc/proto/zed.proto | 12 +++----- crates/server/src/rpc.rs | 3 -- crates/server/src/rpc/store.rs | 9 ------ 5 files changed, 14 insertions(+), 67 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8d11dcf4905139f0b89dfd707cdb4cc17d64bb32..0d655ed87a345e49db3c82dcfe27a87730d9b680 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -3885,7 +3885,6 @@ mod tests { &initial_snapshot, 1, 1, - 0, true, ); remote diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 84170115890e747dd270709d29848d6ba1663694..23dd1895f4287ca12476ee9f22214a53bdfd980d 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -84,7 +84,6 @@ pub struct RemoteWorktree { queued_operations: Vec<(u64, Operation)>, diagnostic_summaries: TreeMap, weak: bool, - next_update_id: u64, pending_updates: VecDeque, } @@ -239,7 +238,6 @@ impl Worktree { }), ), weak, - next_update_id: worktree.next_update_id, pending_updates: Default::default(), }) }); @@ -792,20 +790,13 @@ impl LocalWorktree { let _ = share_tx.try_send(Ok(())); } - let mut update_id = 0; let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = snapshot.build_update( - &prev_snapshot, - project_id, - worktree_id, - update_id, - false, - ); + let message = + snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); match rpc.request(message).await { Ok(_) => { prev_snapshot = snapshot; - update_id += 1; } Err(err) => log::error!("error sending snapshot diff {}", err), } @@ -844,30 +835,9 @@ impl RemoteWorktree { &mut self, envelope: TypedEnvelope, ) -> Result<()> { - let update = envelope.payload; - if update.id > self.next_update_id { - let ix = match self - .pending_updates - .binary_search_by_key(&update.id, |pending| pending.id) - { - Ok(ix) | Err(ix) => ix, - }; - self.pending_updates.insert(ix, update); - } else { - let tx = self.updates_tx.clone(); - self.next_update_id += 1; - tx.unbounded_send(update) - .expect("consumer runs to completion"); - while let Some(update) = self.pending_updates.front() { - if update.id == self.next_update_id { - self.next_update_id += 1; - tx.unbounded_send(self.pending_updates.pop_front().unwrap()) - .expect("consumer runs to completion"); - } else { - break; - } - } - } + self.updates_tx + .unbounded_send(envelope.payload) + .expect("consumer runs to completion"); Ok(()) } @@ -1058,7 +1028,6 @@ impl LocalSnapshot { .map(|(path, summary)| summary.to_proto(path.0.clone())) .collect(), weak, - next_update_id: 0, } } @@ -1067,7 +1036,6 @@ impl LocalSnapshot { other: &Self, project_id: u64, worktree_id: u64, - update_id: u64, include_ignored: bool, ) -> proto::UpdateWorktree { let mut updated_entries = Vec::new(); @@ -1120,7 +1088,6 @@ impl LocalSnapshot { } proto::UpdateWorktree { - id: update_id as u64, project_id, worktree_id, root_name: self.root_name().to_string(), @@ -2461,7 +2428,7 @@ mod tests { fmt::Write, time::{SystemTime, UNIX_EPOCH}, }; - use util::{post_inc, test::temp_tree}; + use util::test::temp_tree; #[gpui::test] async fn test_traversal(cx: gpui::TestAppContext) { @@ -2646,7 +2613,6 @@ mod tests { new_scanner.snapshot().to_vec(true) ); - let mut update_id = 0; for mut prev_snapshot in snapshots { let include_ignored = rng.gen::(); if !include_ignored { @@ -2667,13 +2633,9 @@ mod tests { prev_snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - let update = scanner.snapshot().build_update( - &prev_snapshot, - 0, - 0, - post_inc(&mut update_id), - include_ignored, - ); + let update = scanner + .snapshot() + .build_update(&prev_snapshot, 0, 0, include_ignored); prev_snapshot.apply_remote_update(update).unwrap(); assert_eq!( prev_snapshot.to_vec(true), diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8e83b099631617462983560d08ffe49a169408b9..cbc2fa6d51cfc72307be491bb34630f1fd9c080c 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -145,12 +145,11 @@ message ShareWorktree { } message UpdateWorktree { - uint64 id = 1; - uint64 project_id = 2; - uint64 worktree_id = 3; - string root_name = 4; - repeated Entry updated_entries = 5; - repeated uint64 removed_entries = 6; + uint64 project_id = 1; + uint64 worktree_id = 2; + string root_name = 3; + repeated Entry updated_entries = 4; + repeated uint64 removed_entries = 5; } message AddProjectCollaborator { @@ -494,7 +493,6 @@ message Worktree { repeated Entry entries = 3; repeated DiagnosticSummary diagnostic_summaries = 4; bool weak = 5; - uint64 next_update_id = 6; } message File { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 287f94bac43bc5c52c679febe32f5677be19436a..3e4645b55bda6d83551a034545cce2855e6f3dce 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -350,7 +350,6 @@ impl Server { .cloned() .collect(), weak: worktree.weak, - next_update_id: share.next_update_id as u64, }) }) .collect(); @@ -489,7 +488,6 @@ impl Server { request.sender_id, entries, diagnostic_summaries, - worktree.next_update_id, )?; broadcast( @@ -513,7 +511,6 @@ impl Server { request.sender_id, request.payload.project_id, request.payload.worktree_id, - request.payload.id, &request.payload.removed_entries, &request.payload.updated_entries, )?; diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 41c611a0972229261c7a80884ccceb309ee97ff4..5cb0a0e1db028631c468b1ab3519114483a12f14 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -43,7 +43,6 @@ pub struct ProjectShare { pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, - pub next_update_id: u64, } #[derive(Default)] @@ -404,7 +403,6 @@ impl Store { connection_id: ConnectionId, entries: HashMap, diagnostic_summaries: BTreeMap, - next_update_id: u64, ) -> tide::Result { let project = self .projects @@ -418,7 +416,6 @@ impl Store { worktree.share = Some(WorktreeShare { entries, diagnostic_summaries, - next_update_id, }); Ok(SharedWorktree { authorized_user_ids: project.authorized_user_ids(), @@ -537,7 +534,6 @@ impl Store { connection_id: ConnectionId, project_id: u64, worktree_id: u64, - update_id: u64, removed_entries: &[u64], updated_entries: &[proto::Entry], ) -> tide::Result> { @@ -549,11 +545,6 @@ impl Store { .share .as_mut() .ok_or_else(|| anyhow!("worktree is not shared"))?; - if share.next_update_id != update_id { - return Err(anyhow!("received worktree updates out-of-order"))?; - } - - share.next_update_id = update_id + 1; for entry_id in removed_entries { share.entries.remove(&entry_id); } From d1b4384f80765bf5cd434b6f56c134b5aa69ecba Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 23 Feb 2022 19:04:22 +0100 Subject: [PATCH 04/14] WIP --- crates/project/src/worktree.rs | 110 +++++++++++++++++---------------- crates/rpc/proto/zed.proto | 7 +-- crates/rpc/src/proto.rs | 3 - crates/server/src/rpc.rs | 81 ++++++++---------------- crates/server/src/rpc/store.rs | 30 --------- 5 files changed, 84 insertions(+), 147 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 23dd1895f4287ca12476ee9f22214a53bdfd980d..5fd7551c6d218d1dcbc03f75b2448b4cf0b7fcd0 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -43,7 +43,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap}; -use util::ResultExt; +use util::{ResultExt, TryFutureExt}; lazy_static! { static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore"); @@ -137,7 +137,7 @@ enum Registration { struct ShareState { project_id: u64, snapshots_tx: Sender, - _maintain_remote_snapshot: Option>, + _maintain_remote_snapshot: Option>>, } #[derive(Default, Deserialize)] @@ -737,6 +737,7 @@ impl LocalWorktree { worktree_id: self.id().to_proto(), root_name: self.root_name().to_string(), authorized_logins: self.authorized_logins(), + weak: self.weak, }; cx.spawn(|this, mut cx| async move { let response = client.request(register_message).await; @@ -760,61 +761,66 @@ impl LocalWorktree { &mut self, project_id: u64, cx: &mut ModelContext, - ) -> Task> { - if self.share.is_some() { - return Task::ready(Ok(())); - } - - let snapshot = self.snapshot(); - let rpc = self.client.clone(); - let worktree_id = cx.model_id() as u64; - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); + ) -> impl Future> { let (mut share_tx, mut share_rx) = oneshot::channel(); - let maintain_remote_snapshot = cx.background().spawn({ - let rpc = rpc.clone(); - let snapshot = snapshot.clone(); - let diagnostic_summaries = self.diagnostic_summaries.clone(); - let weak = self.weak; - async move { - if let Err(error) = rpc - .request(proto::ShareWorktree { - project_id, - worktree: Some(snapshot.to_proto(&diagnostic_summaries, weak)), - }) - .await - { - let _ = share_tx.try_send(Err(error)); - return; - } else { - let _ = share_tx.try_send(Ok(())); - } + if self.share.is_some() { + let _ = share_tx.try_send(Ok(())); + } else { + let snapshot = self.snapshot(); + let rpc = self.client.clone(); + let worktree_id = cx.model_id() as u64; + let (snapshots_to_send_tx, snapshots_to_send_rx) = + smol::channel::unbounded::(); + let (mut share_tx, mut share_rx) = oneshot::channel(); + let maintain_remote_snapshot = cx.background().spawn({ + let rpc = rpc.clone(); + let snapshot = snapshot.clone(); + let diagnostic_summaries = self.diagnostic_summaries.clone(); + let weak = self.weak; + async move { + if let Err(error) = rpc + .request(proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .filter(|e| !e.is_ignored) + .map(Into::into) + .collect(), + removed_entries: Default::default(), + }) + .await + { + let _ = share_tx.try_send(Err(error)); + return Err(anyhow!("failed to send initial update worktree")); + } else { + let _ = share_tx.try_send(Ok(())); + } - let mut prev_snapshot = snapshot; - while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); - match rpc.request(message).await { - Ok(_) => { - prev_snapshot = snapshot; - } - Err(err) => log::error!("error sending snapshot diff {}", err), + let mut prev_snapshot = snapshot; + while let Ok(snapshot) = snapshots_to_send_rx.recv().await { + let message = + snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); + rpc.request(message).await?; + prev_snapshot = snapshot; } + + Ok::<_, anyhow::Error>(()) } - } - }); - self.share = Some(ShareState { - project_id, - snapshots_tx: snapshots_to_send_tx, - _maintain_remote_snapshot: Some(maintain_remote_snapshot), - }); + .log_err() + }); + self.share = Some(ShareState { + project_id, + snapshots_tx: snapshots_to_send_tx, + _maintain_remote_snapshot: Some(maintain_remote_snapshot), + }); + } - cx.foreground().spawn(async move { - match share_rx.next().await { - Some(result) => result, - None => Err(anyhow!("unshared before sharing completed")), - } - }) + async move { + share_rx.next().await; + } } pub fn unshare(&mut self) { diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cbc2fa6d51cfc72307be491bb34630f1fd9c080c..ebdb39942ddfa4968ba5ffcaa4acf8bd8a794140 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -34,7 +34,6 @@ message Envelope { RegisterWorktree register_worktree = 28; UnregisterWorktree unregister_worktree = 29; - ShareWorktree share_worktree = 30; UpdateWorktree update_worktree = 31; UpdateDiagnosticSummary update_diagnostic_summary = 32; DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33; @@ -132,6 +131,7 @@ message RegisterWorktree { uint64 worktree_id = 2; string root_name = 3; repeated string authorized_logins = 4; + bool weak = 5; } message UnregisterWorktree { @@ -139,11 +139,6 @@ message UnregisterWorktree { uint64 worktree_id = 2; } -message ShareWorktree { - uint64 project_id = 1; - Worktree worktree = 2; -} - message UpdateWorktree { uint64 project_id = 1; uint64 worktree_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 018f4aed0b829db99c2b382cc49eee6554e03b6d..4ac61377c4511abd2d3712fb7b4984dc4e7d6273 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -188,7 +188,6 @@ messages!( (SendChannelMessage, Foreground), (SendChannelMessageResponse, Foreground), (ShareProject, Foreground), - (ShareWorktree, Foreground), (Test, Foreground), (UnregisterProject, Foreground), (UnregisterWorktree, Foreground), @@ -228,7 +227,6 @@ request_messages!( (SaveBuffer, BufferSaved), (SendChannelMessage, SendChannelMessageResponse), (ShareProject, Ack), - (ShareWorktree, Ack), (Test, Test), (UpdateBuffer, Ack), (UpdateWorktree, Ack), @@ -259,7 +257,6 @@ entity_messages!( PrepareRename, RemoveProjectCollaborator, SaveBuffer, - ShareWorktree, UnregisterWorktree, UnshareProject, UpdateBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 3e4645b55bda6d83551a034545cce2855e6f3dce..2dc56138ff3a2716852947b71b39e31bdc446cf4 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -16,7 +16,7 @@ use rpc::{ Connection, ConnectionId, Peer, TypedEnvelope, }; use sha1::{Digest as _, Sha1}; -use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant}; +use std::{any::TypeId, future::Future, sync::Arc, time::Instant}; use store::{Store, Worktree}; use surf::StatusCode; use tide::log; @@ -73,7 +73,6 @@ impl Server { .add_message_handler(Server::leave_project) .add_request_handler(Server::register_worktree) .add_message_handler(Server::unregister_worktree) - .add_request_handler(Server::share_worktree) .add_request_handler(Server::update_worktree) .add_message_handler(Server::update_diagnostic_summary) .add_message_handler(Server::disk_based_diagnostics_updating) @@ -419,23 +418,34 @@ impl Server { let mut contact_user_ids = HashSet::default(); contact_user_ids.insert(host_user_id); - for github_login in request.payload.authorized_logins { - let contact_user_id = self.app_state.db.create_user(&github_login, false).await?; + for github_login in &request.payload.authorized_logins { + let contact_user_id = self.app_state.db.create_user(github_login, false).await?; contact_user_ids.insert(contact_user_id); } let contact_user_ids = contact_user_ids.into_iter().collect::>(); - self.state_mut().register_worktree( - request.payload.project_id, - request.payload.worktree_id, - request.sender_id, - Worktree { - authorized_user_ids: contact_user_ids.clone(), - root_name: request.payload.root_name, - share: None, - weak: false, - }, - )?; + let guest_connection_ids; + { + let mut state = self.state_mut(); + guest_connection_ids = state + .read_project(request.payload.project_id, request.sender_id)? + .guest_connection_ids(); + state.register_worktree( + request.payload.project_id, + request.payload.worktree_id, + request.sender_id, + Worktree { + authorized_user_ids: contact_user_ids.clone(), + root_name: request.payload.root_name.clone(), + share: None, + weak: request.payload.weak, + }, + )?; + } + broadcast(request.sender_id, guest_connection_ids, |connection_id| { + self.peer + .forward_send(request.sender_id, connection_id, request.payload.clone()) + })?; self.update_contacts_for_users(&contact_user_ids)?; Ok(proto::Ack {}) } @@ -462,47 +472,6 @@ impl Server { Ok(()) } - async fn share_worktree( - mut self: Arc, - mut request: TypedEnvelope, - ) -> tide::Result { - let worktree = request - .payload - .worktree - .as_mut() - .ok_or_else(|| anyhow!("missing worktree"))?; - let entries = worktree - .entries - .iter() - .map(|entry| (entry.id, entry.clone())) - .collect(); - let diagnostic_summaries = worktree - .diagnostic_summaries - .iter() - .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone())) - .collect(); - - let shared_worktree = self.state_mut().share_worktree( - request.payload.project_id, - worktree.id, - request.sender_id, - entries, - diagnostic_summaries, - )?; - - broadcast( - request.sender_id, - shared_worktree.connection_ids, - |connection_id| { - self.peer - .forward_send(request.sender_id, connection_id, request.payload.clone()) - }, - )?; - self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?; - - Ok(proto::Ack {}) - } - async fn update_worktree( mut self: Arc, request: TypedEnvelope, diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 5cb0a0e1db028631c468b1ab3519114483a12f14..ec23c06992e3d0f3eb4a55376f46643ff8413c0a 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -396,36 +396,6 @@ impl Store { } } - pub fn share_worktree( - &mut self, - project_id: u64, - worktree_id: u64, - connection_id: ConnectionId, - entries: HashMap, - diagnostic_summaries: BTreeMap, - ) -> tide::Result { - let project = self - .projects - .get_mut(&project_id) - .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; - if project.host_connection_id == connection_id && project.share.is_some() { - worktree.share = Some(WorktreeShare { - entries, - diagnostic_summaries, - }); - Ok(SharedWorktree { - authorized_user_ids: project.authorized_user_ids(), - connection_ids: project.guest_connection_ids(), - }) - } else { - Err(anyhow!("no such worktree"))? - } - } - pub fn update_diagnostic_summary( &mut self, project_id: u64, From 17c9aa181936b55f2953e5c44b9a375a027f6838 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Wed, 23 Feb 2022 11:56:09 -0700 Subject: [PATCH 05/14] Remove ShareWorktree message Instead, create an empty worktree on guests when a worktree is first *registered*, then update it via an initial UpdateWorktree message. This prevents the host from referencing a worktree in definition RPC responses that hasn't yet been observed by the guest. We could have waited until the entire worktree was shared, but this could take a long time, so instead we create an empty one on guests and proceed from there. We still have randomized test failures as of this commit: SEED=9519 MAX_PEERS=2 ITERATIONS=10000 OPERATIONS=7 ct -p zed-server test_random_collaboration Co-Authored-By: Max Brunsfeld Co-Authored-By: Antonio Scandurra --- crates/project/src/project.rs | 19 +++++++----- crates/project/src/worktree.rs | 18 ++++++++--- crates/rpc/src/proto.rs | 1 + crates/server/src/rpc.rs | 25 +++++++--------- crates/server/src/rpc/store.rs | 55 ++++++++++++++++------------------ 5 files changed, 63 insertions(+), 55 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 0d655ed87a345e49db3c82dcfe27a87730d9b680..1de6b7b13151c82c07802f111462a428fc338e16 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -169,7 +169,7 @@ impl DiagnosticSummary { this } - pub fn to_proto(&self, path: Arc) -> proto::DiagnosticSummary { + pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary { proto::DiagnosticSummary { path: path.to_string_lossy().to_string(), error_count: self.error_count as u32, @@ -195,7 +195,7 @@ impl Project { client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated); client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating); client.add_entity_message_handler(Self::handle_remove_collaborator); - client.add_entity_message_handler(Self::handle_share_worktree); + client.add_entity_message_handler(Self::handle_register_worktree); client.add_entity_message_handler(Self::handle_unregister_worktree); client.add_entity_message_handler(Self::handle_unshare_project); client.add_entity_message_handler(Self::handle_update_buffer_file); @@ -2347,19 +2347,22 @@ impl Project { }) } - async fn handle_share_worktree( + async fn handle_register_worktree( this: ModelHandle, - envelope: TypedEnvelope, + envelope: TypedEnvelope, client: Arc, mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?; let replica_id = this.replica_id(); - let worktree = envelope - .payload - .worktree - .ok_or_else(|| anyhow!("invalid worktree"))?; + let worktree = proto::Worktree { + id: envelope.payload.worktree_id, + root_name: envelope.payload.root_name, + entries: Default::default(), + diagnostic_summaries: Default::default(), + weak: envelope.payload.weak, + }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); this.add_worktree(&worktree, cx); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 5fd7551c6d218d1dcbc03f75b2448b4cf0b7fcd0..367db436c7daeb6fc6765a22f7146c6496b1fd36 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -771,12 +771,10 @@ impl LocalWorktree { let worktree_id = cx.model_id() as u64; let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); - let (mut share_tx, mut share_rx) = oneshot::channel(); let maintain_remote_snapshot = cx.background().spawn({ let rpc = rpc.clone(); let snapshot = snapshot.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone(); - let weak = self.weak; async move { if let Err(error) = rpc .request(proto::UpdateWorktree { @@ -799,6 +797,14 @@ impl LocalWorktree { let _ = share_tx.try_send(Ok(())); } + for (path, summary) in diagnostic_summaries.iter() { + rpc.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id, + summary: Some(summary.to_proto(&path.0)), + })?; + } + let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { let message = @@ -819,7 +825,10 @@ impl LocalWorktree { } async move { - share_rx.next().await; + share_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("share ended"))) } } @@ -1014,6 +1023,7 @@ impl Snapshot { } impl LocalSnapshot { + #[cfg(test)] pub(crate) fn to_proto( &self, diagnostic_summaries: &TreeMap, @@ -1031,7 +1041,7 @@ impl LocalSnapshot { .collect(), diagnostic_summaries: diagnostic_summaries .iter() - .map(|(path, summary)| summary.to_proto(path.0.clone())) + .map(|(path, summary)| summary.to_proto(&path.0)) .collect(), weak, } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 4ac61377c4511abd2d3712fb7b4984dc4e7d6273..333bbd400ef1bfbb0a48590d719922a6682fcdf1 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -262,6 +262,7 @@ entity_messages!( UpdateBuffer, UpdateBufferFile, UpdateDiagnosticSummary, + RegisterWorktree, UpdateWorktree, ); diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2dc56138ff3a2716852947b71b39e31bdc446cf4..791796d494022834b43b18f8c89f9b04b857e781 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -334,16 +334,16 @@ impl Server { replica_id: 0, user_id: joined.project.host_user_id.to_proto(), }); - let worktrees = joined - .project + let worktrees = share .worktrees .iter() - .filter_map(|(id, worktree)| { - worktree.share.as_ref().map(|share| proto::Worktree { + .filter_map(|(id, shared_worktree)| { + let worktree = joined.project.worktrees.get(&id)?; + Some(proto::Worktree { id: *id, root_name: worktree.root_name.clone(), - entries: share.entries.values().cloned().collect(), - diagnostic_summaries: share + entries: shared_worktree.entries.values().cloned().collect(), + diagnostic_summaries: shared_worktree .diagnostic_summaries .values() .cloned() @@ -437,7 +437,6 @@ impl Server { Worktree { authorized_user_ids: contact_user_ids.clone(), root_name: request.payload.root_name.clone(), - share: None, weak: request.payload.weak, }, )?; @@ -1164,7 +1163,7 @@ mod tests { cell::Cell, env, ops::Deref, - path::Path, + path::{Path, PathBuf}, rc::Rc, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -2115,16 +2114,14 @@ mod tests { let worktree = store .project(project_id) .unwrap() + .share + .as_ref() + .unwrap() .worktrees .get(&worktree_id.to_proto()) .unwrap(); - !worktree - .share - .as_ref() - .unwrap() - .diagnostic_summaries - .is_empty() + !worktree.diagnostic_summaries.is_empty() }) .await; diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index ec23c06992e3d0f3eb4a55376f46643ff8413c0a..e6c4429b6960a749175f4ee9fac25a792e715fbc 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -30,7 +30,6 @@ pub struct Project { pub struct Worktree { pub authorized_user_ids: Vec, pub root_name: String, - pub share: Option, pub weak: bool, } @@ -38,8 +37,10 @@ pub struct Worktree { pub struct ProjectShare { pub guests: HashMap, pub active_replica_ids: HashSet, + pub worktrees: HashMap, } +#[derive(Default)] pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, @@ -74,11 +75,6 @@ pub struct LeftProject { pub authorized_user_ids: Vec, } -pub struct SharedWorktree { - pub authorized_user_ids: Vec, - pub connection_ids: Vec, -} - impl Store { pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { self.connections.insert( @@ -272,6 +268,9 @@ impl Store { connection.projects.insert(project_id); } project.worktrees.insert(worktree_id, worktree); + if let Ok(share) = project.share_mut() { + share.worktrees.insert(worktree_id, Default::default()); + } #[cfg(test)] self.check_invariants(); @@ -326,8 +325,9 @@ impl Store { .ok_or_else(|| anyhow!("no such worktree"))?; let mut guest_connection_ids = Vec::new(); - if let Some(share) = &project.share { + if let Ok(share) = project.share_mut() { guest_connection_ids.extend(share.guests.keys()); + share.worktrees.remove(&worktree_id); } for authorized_user_id in &worktree.authorized_user_ids { @@ -349,7 +349,11 @@ impl Store { pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool { if let Some(project) = self.projects.get_mut(&project_id) { if project.host_connection_id == connection_id { - project.share = Some(ProjectShare::default()); + let mut share = ProjectShare::default(); + for worktree_id in project.worktrees.keys() { + share.worktrees.insert(*worktree_id, Default::default()); + } + project.share = Some(share); return true; } } @@ -380,10 +384,6 @@ impl Store { } } - for worktree in project.worktrees.values_mut() { - worktree.share.take(); - } - #[cfg(test)] self.check_invariants(); @@ -407,17 +407,16 @@ impl Store { .projects .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; if project.host_connection_id == connection_id { - if let Some(share) = worktree.share.as_mut() { - share - .diagnostic_summaries - .insert(summary.path.clone().into(), summary); - return Ok(project.connection_ids()); - } + let worktree = project + .share_mut()? + .worktrees + .get_mut(&worktree_id) + .ok_or_else(|| anyhow!("no such worktree"))?; + worktree + .diagnostic_summaries + .insert(summary.path.clone().into(), summary); + return Ok(project.connection_ids()); } Err(anyhow!("no such worktree"))? @@ -508,18 +507,16 @@ impl Store { updated_entries: &[proto::Entry], ) -> tide::Result> { let project = self.write_project(project_id, connection_id)?; - let share = project + let worktree = project + .share_mut()? .worktrees .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))? - .share - .as_mut() - .ok_or_else(|| anyhow!("worktree is not shared"))?; + .ok_or_else(|| anyhow!("no such worktree"))?; for entry_id in removed_entries { - share.entries.remove(&entry_id); + worktree.entries.remove(&entry_id); } for entry in updated_entries { - share.entries.insert(entry.id, entry.clone()); + worktree.entries.insert(entry.id, entry.clone()); } Ok(project.connection_ids()) } From 170487a5283b3d7783aa24e5485c0974600165e3 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 15:25:58 -0800 Subject: [PATCH 06/14] Fix race conditions with LSP requests that return buffers * Avoid panic when registering a buffer that was previously open, and whose weak handle was still present in the open_buffers map. * Avoid releasing any buffers while a request is outstanding which could return a reference to a buffer. Co-Authored-By: Nathan Sobo --- crates/project/src/lsp_command.rs | 23 +++++- crates/project/src/project.rs | 117 ++++++++++++++++++++++++++---- crates/server/src/rpc.rs | 32 +++++--- 3 files changed, 145 insertions(+), 27 deletions(-) diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3b502fc8fafc5accfc977eee572c853a68701b48..b091fe0bc390bf95f36d51596e81696790c01fa0 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -1,4 +1,4 @@ -use crate::{DocumentHighlight, Location, Project, ProjectTransaction}; +use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use client::{proto, PeerId}; @@ -48,6 +48,7 @@ pub(crate) trait LspCommand: 'static + Sized { message: ::Response, project: ModelHandle, buffer: ModelHandle, + request_handle: BufferRequestHandle, cx: AsyncAppContext, ) -> Result; fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64; @@ -161,6 +162,7 @@ impl LspCommand for PrepareRename { message: proto::PrepareRenameResponse, _: ModelHandle, buffer: ModelHandle, + _: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result>> { if message.can_rename { @@ -277,6 +279,7 @@ impl LspCommand for PerformRename { message: proto::PerformRenameResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result { let message = message @@ -284,7 +287,12 @@ impl LspCommand for PerformRename { .ok_or_else(|| anyhow!("missing transaction"))?; project .update(&mut cx, |project, cx| { - project.deserialize_project_transaction(message, self.push_to_history, cx) + project.deserialize_project_transaction( + message, + self.push_to_history, + request_handle, + cx, + ) }) .await } @@ -427,13 +435,16 @@ impl LspCommand for GetDefinition { message: proto::GetDefinitionResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -575,13 +586,16 @@ impl LspCommand for GetReferences { message: proto::GetReferencesResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let target_buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -706,6 +720,7 @@ impl LspCommand for GetDocumentHighlights { message: proto::GetDocumentHighlightsResponse, _: ModelHandle, _: ModelHandle, + _: BufferRequestHandle, _: AsyncAppContext, ) -> Result> { Ok(message diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 1de6b7b13151c82c07802f111462a428fc338e16..7b8d6d17dd2a92e85aca9fb7332314a9dd0fbdc3 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -25,12 +25,17 @@ use rand::prelude::*; use sha2::{Digest, Sha256}; use smol::block_on; use std::{ + cell::RefCell, convert::TryInto, hash::Hash, mem, ops::Range, path::{Component, Path, PathBuf}, - sync::{atomic::AtomicBool, Arc}, + rc::Rc, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, time::Instant, }; use util::{post_inc, ResultExt, TryFutureExt as _}; @@ -58,6 +63,8 @@ pub struct Project { ProjectPath, postage::watch::Receiver, Arc>>>, >, + buffer_request_count: Rc, + preserved_buffers: Rc>>>, shared_buffers: HashMap>>, nonce: u128, } @@ -142,6 +149,11 @@ pub struct Symbol { pub signature: [u8; 32], } +pub struct BufferRequestHandle { + buffer_request_count: Rc, + preserved_buffers: Rc>>>, +} + #[derive(Default)] pub struct ProjectTransaction(pub HashMap, language::Transaction>); @@ -273,6 +285,7 @@ impl Project { open_buffers: Default::default(), loading_buffers: Default::default(), shared_buffers: Default::default(), + preserved_buffers: Default::default(), client_state: ProjectClientState::Local { is_shared: false, remote_id_tx, @@ -288,6 +301,7 @@ impl Project { fs, language_servers_with_diagnostics_running: 0, language_servers: Default::default(), + buffer_request_count: Default::default(), started_language_servers: Default::default(), nonce: StdRng::from_entropy().gen(), } @@ -342,6 +356,8 @@ impl Project { language_servers_with_diagnostics_running: 0, language_servers: Default::default(), started_language_servers: Default::default(), + buffer_request_count: Default::default(), + preserved_buffers: Default::default(), nonce: StdRng::from_entropy().gen(), }; for worktree in worktrees { @@ -682,6 +698,7 @@ impl Project { let remote_worktree_id = worktree.read(cx).id(); let path = path.clone(); let path_string = path.to_string_lossy().to_string(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let response = rpc .request(proto::OpenBuffer { @@ -691,8 +708,11 @@ impl Project { }) .await?; let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } @@ -733,6 +753,21 @@ impl Project { }) } + fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle { + if self.buffer_request_count.fetch_add(1, Ordering::SeqCst) == 0 { + self.preserved_buffers.borrow_mut().extend( + self.open_buffers + .values() + .filter_map(|buffer| buffer.upgrade(cx)), + ) + } + + BufferRequestHandle { + buffer_request_count: self.buffer_request_count.clone(), + preserved_buffers: self.preserved_buffers.clone(), + } + } + pub fn save_buffer_as( &self, buffer: ModelHandle, @@ -804,15 +839,23 @@ impl Project { worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Result<()> { - match self.open_buffers.insert( - buffer.read(cx).remote_id(), - OpenBuffer::Loaded(buffer.downgrade()), - ) { + let remote_id = buffer.read(cx).remote_id(); + match self + .open_buffers + .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade())) + { None => {} Some(OpenBuffer::Loading(operations)) => { buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))? } - Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?, + Some(OpenBuffer::Loaded(existing_handle)) => { + if existing_handle.upgrade(cx).is_some() { + Err(anyhow!( + "already registered buffer with remote id {}", + remote_id + ))? + } + } } self.assign_language_to_buffer(&buffer, worktree, cx); Ok(()) @@ -1195,6 +1238,7 @@ impl Project { let remote_buffers = self.remote_id().zip(remote_buffers); let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); @@ -1213,7 +1257,12 @@ impl Project { .ok_or_else(|| anyhow!("missing transaction"))?; project_transaction = this .update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await?; } @@ -1430,6 +1479,7 @@ impl Project { cx, ) } else if let Some(project_id) = self.remote_id() { + let request_handle = self.start_buffer_request(cx); let request = self.client.request(proto::OpenBufferForSymbol { project_id, symbol: Some(serialize_symbol(symbol)), @@ -1437,8 +1487,10 @@ impl Project { cx.spawn(|this, mut cx| async move { let response = request.await?; let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } else { Task::ready(Err(anyhow!("project does not have a remote id"))) @@ -1817,6 +1869,7 @@ impl Project { }) } else if let Some(project_id) = self.remote_id() { let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let request = proto::ApplyCodeAction { project_id, buffer_id: buffer_handle.read(cx).remote_id(), @@ -1829,7 +1882,12 @@ impl Project { .transaction .ok_or_else(|| anyhow!("missing transaction"))?; this.update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await }) @@ -2020,11 +2078,12 @@ impl Project { } } else if let Some(project_id) = self.remote_id() { let rpc = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let message = request.to_proto(project_id, buffer); return cx.spawn(|this, cx| async move { let response = rpc.request(message).await?; request - .response_from_proto(response, this, buffer_handle, cx) + .response_from_proto(response, this, buffer_handle, request_handle, cx) .await }); } @@ -2864,13 +2923,16 @@ impl Project { &mut self, message: proto::ProjectTransaction, push_to_history: bool, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task> { cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { let buffer = this - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); @@ -2917,6 +2979,7 @@ impl Project { fn deserialize_buffer( &mut self, buffer: proto::Buffer, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task>> { let replica_id = self.replica_id(); @@ -2963,6 +3026,8 @@ impl Project { let buffer = cx.add_model(|cx| { Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap() }); + + request_handle.preserve_buffer(buffer.clone()); this.update(&mut cx, |this, cx| { this.register_buffer(&buffer, buffer_worktree.as_ref(), cx) })?; @@ -3111,6 +3176,30 @@ impl Project { } } +impl BufferRequestHandle { + fn preserve_buffer(&self, buffer: ModelHandle) { + self.preserved_buffers.borrow_mut().push(buffer); + } +} + +impl Clone for BufferRequestHandle { + fn clone(&self) -> Self { + self.buffer_request_count.fetch_add(1, Ordering::SeqCst); + Self { + buffer_request_count: self.buffer_request_count.clone(), + preserved_buffers: self.preserved_buffers.clone(), + } + } +} + +impl Drop for BufferRequestHandle { + fn drop(&mut self) { + if self.buffer_request_count.fetch_sub(1, Ordering::SeqCst) == 1 { + self.preserved_buffers.borrow_mut().clear(); + } + } +} + impl WorktreeHandle { pub fn upgrade(&self, cx: &AppContext) -> Option> { match self { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 791796d494022834b43b18f8c89f9b04b857e781..919fc9636ef77ef2195114a780da14892be51a0f 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4856,6 +4856,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Host done"); + self.project = Some(project); (self, cx) } @@ -4887,15 +4889,25 @@ mod tests { }; operations.set(operations.get() + 1); - let project_path = worktree.read_with(&cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - (worktree.id(), entry.path.clone()) - }); - log::info!("Guest {}: opening path {:?}", guest_id, project_path); + let (worktree_root_name, project_path) = + worktree.read_with(&cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "Guest {}: opening path in worktree {:?} {:?} {:?}", + guest_id, + project_path.0, + worktree_root_name, + project_path.1 + ); let buffer = project .update(&mut cx, |project, cx| project.open_buffer(project_path, cx)) .await @@ -5062,6 +5074,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Guest {} done", guest_id); + self.project = Some(project); (self, cx) } From e714b00c26864a1133fd44d58ca5f4e1e66f0980 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 15:37:51 -0800 Subject: [PATCH 07/14] Improve logging around handling RPC requests on client --- crates/client/src/client.rs | 17 +++++++++++++---- crates/rpc/src/proto.rs | 5 +++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 2c31e8eef376311210ab9e3ae76d703ca903eb69..714ffe6925a92eb88df204efb5c404ec27f72759 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -530,10 +530,13 @@ impl Client { let cx = cx.clone(); let this = self.clone(); async move { + let mut message_id = 0_usize; while let Some(message) = incoming.next().await { let mut state = this.state.write(); - let payload_type_id = message.payload_type_id(); + message_id += 1; let type_name = message.payload_type_name(); + let payload_type_id = message.payload_type_id(); + let sender_id = message.original_sender_id().map(|id| id.0); let model = state .models_by_message_type @@ -575,8 +578,10 @@ impl Client { let client_id = this.id; log::debug!( - "rpc message received. client_id:{}, name:{}", + "rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}", client_id, + message_id, + sender_id, type_name ); cx.foreground() @@ -584,15 +589,19 @@ impl Client { match future.await { Ok(()) => { log::debug!( - "rpc message handled. client_id:{}, name:{}", + "rpc message handled. client_id:{}, message_id:{}, sender_id:{:?}, type:{}", client_id, + message_id, + sender_id, type_name ); } Err(error) => { log::error!( - "error handling message. client_id:{}, name:{}, {}", + "error handling message. client_id:{}, message_id:{}, sender_id:{:?}, type:{}, error:{:?}", client_id, + message_id, + sender_id, type_name, error ); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 333bbd400ef1bfbb0a48590d719922a6682fcdf1..5ec46eb353a59652eff919b9f8fce9fdb11aebb3 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -37,6 +37,7 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync { fn as_any(&self) -> &dyn Any; fn into_any(self: Box) -> Box; fn is_background(&self) -> bool; + fn original_sender_id(&self) -> Option; } pub enum MessagePriority { @@ -64,6 +65,10 @@ impl AnyTypedEnvelope for TypedEnvelope { fn is_background(&self) -> bool { matches!(T::PRIORITY, MessagePriority::Background) } + + fn original_sender_id(&self) -> Option { + self.original_sender_id + } } macro_rules! messages { From e9009d4edf4c2a4b18d4e36949aeb70cd2ce2eca Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 16:27:34 -0800 Subject: [PATCH 08/14] Tweak logging in random collaboration test --- crates/server/src/rpc.rs | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 919fc9636ef77ef2195114a780da14892be51a0f..b4559a4bcbb04ed346bc303b22ea6e596721fe26 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4440,9 +4440,11 @@ mod tests { assert_eq!( guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()), host_buffer.read_with(&host_cx, |buffer, _| buffer.text()), - "guest {} buffer {} differs from the host's buffer", + "guest {}, buffer {}, path {:?}, differs from the host's buffer", guest_id, buffer_id, + host_buffer + .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)) ); } } @@ -4691,14 +4693,16 @@ mod tests { let files = files.lock(); let mut rng = rng.lock(); let count = rng.gen_range::(1..3); + let files = (0..count) + .map(|_| files.choose(&mut *rng).unwrap()) + .collect::>(); + log::info!("LSP: Returning definitions in files {:?}", &files); Some(lsp::GotoDefinitionResponse::Array( - (0..count) - .map(|_| { - let file = files.choose(&mut *rng).unwrap().as_path(); - lsp::Location { - uri: lsp::Url::from_file_path(file).unwrap(), - range: Default::default(), - } + files + .into_iter() + .map(|file| lsp::Location { + uri: lsp::Url::from_file_path(file).unwrap(), + range: Default::default(), }) .collect(), )) @@ -4787,13 +4791,17 @@ mod tests { let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); let (worktree, path) = project .update(&mut cx, |project, cx| { - project.find_or_create_local_worktree(file, false, cx) + project.find_or_create_local_worktree( + file.clone(), + false, + cx, + ) }) .await .unwrap(); let project_path = worktree.read_with(&cx, |worktree, _| (worktree.id(), path)); - log::info!("Host: opening path {:?}", project_path); + log::info!("Host: opening path {:?}, {:?}", file, project_path); let buffer = project .update(&mut cx, |project, cx| { project.open_buffer(project_path, cx) From 60600774444412187d2a901c9b930cd65b187e82 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 16:59:39 -0800 Subject: [PATCH 09/14] Remove unused pending_updates field from RemoteWorktree --- crates/project/src/worktree.rs | 8 +------- crates/server/src/rpc.rs | 6 ------ 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 367db436c7daeb6fc6765a22f7146c6496b1fd36..795ca23c8f28cd732858b52c82d1ef2ac9615ac0 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,7 +7,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; use client::{proto, Client, TypedEnvelope}; use clock::ReplicaId; -use collections::{HashMap, VecDeque}; +use collections::HashMap; use futures::{ channel::mpsc::{self, UnboundedSender}, Stream, StreamExt, @@ -84,7 +84,6 @@ pub struct RemoteWorktree { queued_operations: Vec<(u64, Operation)>, diagnostic_summaries: TreeMap, weak: bool, - pending_updates: VecDeque, } #[derive(Clone)] @@ -238,7 +237,6 @@ impl Worktree { }), ), weak, - pending_updates: Default::default(), }) }); @@ -857,10 +855,6 @@ impl RemoteWorktree { Ok(()) } - pub fn has_pending_updates(&self) -> bool { - !self.pending_updates.is_empty() - } - pub fn update_diagnostic_summary( &mut self, path: Arc, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index b4559a4bcbb04ed346bc303b22ea6e596721fe26..39d43e7ea7e7c4046b7ef1bc2c2614fae315a8a1 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4380,12 +4380,6 @@ mod tests { .worktrees(cx) .map(|worktree| { let worktree = worktree.read(cx); - assert!( - !worktree.as_remote().unwrap().has_pending_updates(), - "Guest {} worktree {:?} contains deferred updates", - guest_id, - worktree.id() - ); (worktree.id(), worktree.snapshot()) }) .collect::>() From 51e2e9e68d7f751a7f13a8d114134b8c45957978 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 18:18:52 -0800 Subject: [PATCH 10/14] Make client log message format more consistent --- crates/client/src/client.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 714ffe6925a92eb88df204efb5c404ec27f72759..f0f6ef6146fe9efe712a8c14a9791b6143714863 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -836,7 +836,7 @@ impl Client { ) -> impl Future> { let client_id = self.id; log::debug!( - "rpc request start. client_id: {}. name:{}", + "rpc request start. client_id:{}. name:{}", client_id, T::NAME ); @@ -846,7 +846,7 @@ impl Client { async move { let response = response?.await; log::debug!( - "rpc request finish. client_id: {}. name:{}", + "rpc request finish. client_id:{}. name:{}", client_id, T::NAME ); @@ -855,7 +855,7 @@ impl Client { } fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { - log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); self.peer.respond(receipt, response) } @@ -864,7 +864,7 @@ impl Client { receipt: Receipt, error: proto::Error, ) -> Result<()> { - log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); self.peer.respond_with_error(receipt, error) } } From f1921c8df543153881fabfadff1d76c158ca886c Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 18:22:51 -0800 Subject: [PATCH 11/14] Open buffers from definitions request in random collab test Don't try to open buffers from the weak worktrees directly, as this is expected to fail if the host drops the buffer for that worktree. --- crates/server/src/rpc.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 39d43e7ea7e7c4046b7ef1bc2c2614fae315a8a1..593ad32bcdad9390c4e22110347439650f4e3ccf 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4880,7 +4880,8 @@ mod tests { project .worktrees(&cx) .filter(|worktree| { - worktree.read(cx).entries(false).any(|e| e.is_file()) + let worktree = worktree.read(cx); + !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file()) }) .choose(&mut *rng.lock()) }) { @@ -5033,13 +5034,14 @@ mod tests { project.definition(&buffer, offset, cx) }); let definitions = cx.background().spawn(async move { - definitions.await.expect("definitions request failed"); + definitions.await.expect("definitions request failed") }); if rng.lock().gen_bool(0.3) { log::info!("Guest {}: detaching definitions request", guest_id); definitions.detach(); } else { - definitions.await; + self.buffers + .extend(definitions.await.into_iter().map(|loc| loc.buffer)); } } 50..=55 => { From a6613d5345e766bb4baed5fbf4552b18dd0c8ba6 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 18:22:56 -0800 Subject: [PATCH 12/14] Store operations for unknown buffers when there are outstanding buffer RPC requests --- crates/project/src/project.rs | 158 +++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 68 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7b8d6d17dd2a92e85aca9fb7332314a9dd0fbdc3..8bfa274e8e4a462411472fc450a9cf13714f3cf7 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -32,10 +32,7 @@ use std::{ ops::Range, path::{Component, Path, PathBuf}, rc::Rc, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, - }, + sync::{atomic::AtomicBool, Arc}, time::Instant, }; use util::{post_inc, ResultExt, TryFutureExt as _}; @@ -57,18 +54,23 @@ pub struct Project { collaborators: HashMap, subscriptions: Vec, language_servers_with_diagnostics_running: isize, - open_buffers: HashMap, opened_buffer: broadcast::Sender<()>, loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, >, - buffer_request_count: Rc, - preserved_buffers: Rc>>>, + buffers_state: Rc>, shared_buffers: HashMap>>, nonce: u128, } +#[derive(Default)] +struct ProjectBuffers { + buffer_request_count: usize, + preserved_buffers: Vec>, + open_buffers: HashMap, +} + enum OpenBuffer { Loaded(WeakModelHandle), Loading(Vec), @@ -149,10 +151,7 @@ pub struct Symbol { pub signature: [u8; 32], } -pub struct BufferRequestHandle { - buffer_request_count: Rc, - preserved_buffers: Rc>>>, -} +pub struct BufferRequestHandle(Rc>); #[derive(Default)] pub struct ProjectTransaction(pub HashMap, language::Transaction>); @@ -282,10 +281,9 @@ impl Project { Self { worktrees: Default::default(), collaborators: Default::default(), - open_buffers: Default::default(), + buffers_state: Default::default(), loading_buffers: Default::default(), shared_buffers: Default::default(), - preserved_buffers: Default::default(), client_state: ProjectClientState::Local { is_shared: false, remote_id_tx, @@ -301,7 +299,6 @@ impl Project { fs, language_servers_with_diagnostics_running: 0, language_servers: Default::default(), - buffer_request_count: Default::default(), started_language_servers: Default::default(), nonce: StdRng::from_entropy().gen(), } @@ -337,7 +334,6 @@ impl Project { let this = cx.add_model(|cx| { let mut this = Self { worktrees: Vec::new(), - open_buffers: Default::default(), loading_buffers: Default::default(), opened_buffer: broadcast::channel(1).0, shared_buffers: Default::default(), @@ -356,8 +352,7 @@ impl Project { language_servers_with_diagnostics_running: 0, language_servers: Default::default(), started_language_servers: Default::default(), - buffer_request_count: Default::default(), - preserved_buffers: Default::default(), + buffers_state: Default::default(), nonce: StdRng::from_entropy().gen(), }; for worktree in worktrees { @@ -406,7 +401,9 @@ impl Project { #[cfg(any(test, feature = "test-support"))] pub fn has_buffered_operations(&self) -> bool { - self.open_buffers + self.buffers_state + .borrow() + .open_buffers .values() .any(|buffer| matches!(buffer, OpenBuffer::Loading(_))) } @@ -637,11 +634,6 @@ impl Project { *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { // Record the fact that the buffer is no longer loading. this.loading_buffers.remove(&project_path); - if this.loading_buffers.is_empty() { - this.open_buffers - .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_))) - } - let buffer = load_result.map_err(Arc::new)?; Ok(buffer) })); @@ -754,18 +746,7 @@ impl Project { } fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle { - if self.buffer_request_count.fetch_add(1, Ordering::SeqCst) == 0 { - self.preserved_buffers.borrow_mut().extend( - self.open_buffers - .values() - .filter_map(|buffer| buffer.upgrade(cx)), - ) - } - - BufferRequestHandle { - buffer_request_count: self.buffer_request_count.clone(), - preserved_buffers: self.preserved_buffers.clone(), - } + BufferRequestHandle::new(self.buffers_state.clone(), cx) } pub fn save_buffer_as( @@ -796,16 +777,20 @@ impl Project { pub fn has_open_buffer(&self, path: impl Into, cx: &AppContext) -> bool { let path = path.into(); if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) { - self.open_buffers.iter().any(|(_, buffer)| { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - return true; + self.buffers_state + .borrow() + .open_buffers + .iter() + .any(|(_, buffer)| { + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + return true; + } } } - } - false - }) + false + }) } else { false } @@ -818,18 +803,21 @@ impl Project { ) -> Option> { let mut result = None; let worktree = self.worktree_for_id(path.worktree_id, cx)?; - self.open_buffers.retain(|_, buffer| { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - result = Some(buffer); + self.buffers_state + .borrow_mut() + .open_buffers + .retain(|_, buffer| { + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + result = Some(buffer); + } } + true + } else { + false } - true - } else { - false - } - }); + }); result } @@ -841,6 +829,8 @@ impl Project { ) -> Result<()> { let remote_id = buffer.read(cx).remote_id(); match self + .buffers_state + .borrow_mut() .open_buffers .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade())) { @@ -1175,7 +1165,7 @@ impl Project { path: relative_path.into(), }; - for buffer in self.open_buffers.values() { + for buffer in self.buffers_state.borrow().open_buffers.values() { if let Some(buffer) = buffer.upgrade(cx) { if buffer .read(cx) @@ -2211,7 +2201,7 @@ impl Project { ) { let snapshot = worktree_handle.read(cx).snapshot(); let mut buffers_to_delete = Vec::new(); - for (buffer_id, buffer) in &self.open_buffers { + for (buffer_id, buffer) in &self.buffers_state.borrow().open_buffers { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| { if let Some(old_file) = File::from_dyn(buffer.file()) { @@ -2268,7 +2258,10 @@ impl Project { } for buffer_id in buffers_to_delete { - self.open_buffers.remove(&buffer_id); + self.buffers_state + .borrow_mut() + .open_buffers + .remove(&buffer_id); } } @@ -2396,7 +2389,7 @@ impl Project { .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? .replica_id; this.shared_buffers.remove(&peer_id); - for (_, buffer) in &this.open_buffers { + for (_, buffer) in &this.buffers_state.borrow().open_buffers { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } @@ -2523,7 +2516,9 @@ impl Project { .map(|op| language::proto::deserialize_operation(op)) .collect::, _>>()?; let is_remote = this.is_remote(); - match this.open_buffers.entry(buffer_id) { + let mut buffers_state = this.buffers_state.borrow_mut(); + let buffer_request_count = buffers_state.buffer_request_count; + match buffers_state.open_buffers.entry(buffer_id) { hash_map::Entry::Occupied(mut e) => match e.get_mut() { OpenBuffer::Loaded(buffer) => { if let Some(buffer) = buffer.upgrade(cx) { @@ -2533,7 +2528,7 @@ impl Project { OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops), }, hash_map::Entry::Vacant(e) => { - if is_remote && this.loading_buffers.len() > 0 { + if is_remote && buffer_request_count > 0 { e.insert(OpenBuffer::Loading(ops)); } } @@ -2557,6 +2552,8 @@ impl Project { .ok_or_else(|| anyhow!("no such worktree"))?; let file = File::from_proto(file, worktree.clone(), cx)?; let buffer = this + .buffers_state + .borrow_mut() .open_buffers .get_mut(&buffer_id) .and_then(|b| b.upgrade(cx)) @@ -2937,6 +2934,7 @@ impl Project { let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); } + for (buffer, transaction) in &project_transaction.0 { buffer .update(&mut cx, |buffer, _| { @@ -2991,7 +2989,9 @@ impl Project { proto::buffer::Variant::Id(id) => { let buffer = loop { let buffer = this.read_with(&cx, |this, cx| { - this.open_buffers + this.buffers_state + .borrow() + .open_buffers .get(&id) .and_then(|buffer| buffer.upgrade(cx)) }); @@ -3100,6 +3100,8 @@ impl Project { this.update(&mut cx, |this, cx| { let buffer = this + .buffers_state + .borrow() .open_buffers .get(&envelope.payload.buffer_id) .and_then(|buffer| buffer.upgrade(cx)); @@ -3126,6 +3128,8 @@ impl Project { .into(); this.update(&mut cx, |this, cx| { let buffer = this + .buffers_state + .borrow() .open_buffers .get(&payload.buffer_id) .and_then(|buffer| buffer.upgrade(cx)); @@ -3177,25 +3181,43 @@ impl Project { } impl BufferRequestHandle { + fn new(state: Rc>, cx: &AppContext) -> Self { + { + let state = &mut *state.borrow_mut(); + state.buffer_request_count += 1; + if state.buffer_request_count == 1 { + state.preserved_buffers.extend( + state + .open_buffers + .values() + .filter_map(|buffer| buffer.upgrade(cx)), + ) + } + } + Self(state) + } + fn preserve_buffer(&self, buffer: ModelHandle) { - self.preserved_buffers.borrow_mut().push(buffer); + self.0.borrow_mut().preserved_buffers.push(buffer); } } impl Clone for BufferRequestHandle { fn clone(&self) -> Self { - self.buffer_request_count.fetch_add(1, Ordering::SeqCst); - Self { - buffer_request_count: self.buffer_request_count.clone(), - preserved_buffers: self.preserved_buffers.clone(), - } + self.0.borrow_mut().buffer_request_count += 1; + Self(self.0.clone()) } } impl Drop for BufferRequestHandle { fn drop(&mut self) { - if self.buffer_request_count.fetch_sub(1, Ordering::SeqCst) == 1 { - self.preserved_buffers.borrow_mut().clear(); + let mut state = self.0.borrow_mut(); + state.buffer_request_count -= 1; + if state.buffer_request_count == 0 { + state.preserved_buffers.clear(); + state + .open_buffers + .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_))) } } } From 8fa23c702c4d322e81c80c5c74f689a2da519bda Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 24 Feb 2022 09:32:31 +0100 Subject: [PATCH 13/14] Store ops if buffer handle can't be upgraded and buffer requests are in-flight --- crates/project/src/project.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8bfa274e8e4a462411472fc450a9cf13714f3cf7..236b02cd6c97ef0e940fdf88e4963b32806e8174 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2523,6 +2523,8 @@ impl Project { OpenBuffer::Loaded(buffer) => { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + } else if is_remote && buffer_request_count > 0 { + e.insert(OpenBuffer::Loading(ops)); } } OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops), From d929819c33ca02cb5e1014ab4a2541fbdaaf3705 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 24 Feb 2022 09:52:25 +0100 Subject: [PATCH 14/14] Fix warning --- crates/lsp/src/lsp.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index 179a87e43a825afba9bbde96a5979c44b9d80d7f..c563c233118fb106299c7d57606abf78e8d3e8ac 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context, Result}; use futures::{io::BufWriter, AsyncRead, AsyncWrite}; -use gpui::{executor, AsyncAppContext, Task}; +use gpui::{executor, Task}; use parking_lot::{Mutex, RwLock}; use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch}; use serde::{Deserialize, Serialize}; @@ -485,7 +485,10 @@ impl Drop for Subscription { pub struct FakeLanguageServer { handlers: Arc< Mutex< - HashMap<&'static str, Box Vec>>, + HashMap< + &'static str, + Box Vec>, + >, >, >, outgoing_tx: futures::channel::mpsc::UnboundedSender>, @@ -625,7 +628,7 @@ impl FakeLanguageServer { ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, - F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> T::Result, + F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result, { let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded(); self.handlers.lock().insert(