From 528a4dd9b48af6852ea2cc7c4853fa21f3a8b4e0 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 11:28:46 +0100 Subject: [PATCH 01/12] Fix regression causing guests to miss operations while opening a buffer --- crates/project/src/project.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 04f8aa511aae340b03ad7c6e2753b8ed9361803b..f39e5a33b0b9031cab30073c59766e85fe4257a6 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -524,11 +524,7 @@ impl Project { // Record the fact that the buffer is no longer loading. this.loading_buffers.remove(&project_path); let buffer = load_result.map_err(Arc::new)?; - this.open_buffers.insert( - buffer.read(cx).remote_id() as usize, - OpenBuffer::Loaded(buffer.downgrade()), - ); - this.assign_language_to_buffer(&worktree, &buffer, cx); + this.register_buffer(&buffer, &worktree, cx)?; Ok(buffer) })); }) @@ -568,9 +564,7 @@ impl Project { }) .await?; this.update(&mut cx, |this, cx| { - this.open_buffers - .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade())); - this.assign_language_to_buffer(&worktree, &buffer, cx); + this.assign_language_to_buffer(&buffer, &worktree, cx); }); Ok(()) }) @@ -618,10 +612,32 @@ impl Project { result } - fn assign_language_to_buffer( + fn register_buffer( &mut self, + buffer: &ModelHandle, worktree: &ModelHandle, + cx: &mut ModelContext, + ) -> Result<()> { + match self.open_buffers.insert( + buffer.read(cx).remote_id() as usize, + OpenBuffer::Loaded(buffer.downgrade()), + ) { + Some(OpenBuffer::Operations(pending_ops)) => { + buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; + } + Some(OpenBuffer::Loaded(_)) => { + return Err(anyhow!("registered the same buffer twice")); + } + None => {} + } + self.assign_language_to_buffer(&buffer, &worktree, cx); + Ok(()) + } + + fn assign_language_to_buffer( + &mut self, buffer: &ModelHandle, + worktree: &ModelHandle, cx: &mut ModelContext, ) -> Option<()> { let (path, full_path) = { From 245490f93438163c3710fff11867869daadbc67a Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 12:27:13 +0100 Subject: [PATCH 02/12] Implement `Project::definition` when the buffer is remote --- crates/language/src/proto.rs | 4 +- crates/project/src/project.rs | 128 ++++++++++++++++++- crates/rpc/proto/zed.proto | 89 ++++++++------ crates/rpc/src/proto.rs | 4 + crates/server/src/rpc.rs | 216 +++++++++++++++++++++++++++++++-- crates/server/src/rpc/store.rs | 12 +- 6 files changed, 396 insertions(+), 57 deletions(-) diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index 771d8b7fd3e9a4303dfdf69ceb9f6df03479dffc..4bfc2ae4332edb0fe790559c838f4e4f874b479b 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -155,7 +155,7 @@ pub fn serialize_diagnostics<'a>( .collect() } -fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { +pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { proto::Anchor { replica_id: anchor.timestamp.replica_id as u32, local_timestamp: anchor.timestamp.value, @@ -352,7 +352,7 @@ pub fn deserialize_diagnostics( .collect() } -fn deserialize_anchor(anchor: proto::Anchor) -> Option { +pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { Some(Anchor { timestamp: clock::Local { replica_id: anchor.replica_id as ReplicaId, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index f39e5a33b0b9031cab30073c59766e85fe4257a6..761a9ca8ae0bb603e115d8872ef988b8ddc532d4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -13,6 +13,7 @@ use gpui::{ WeakModelHandle, }; use language::{ + proto::{deserialize_anchor, serialize_anchor}, range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language, LanguageRegistry, Operation, PointUtf16, ToOffset, ToPointUtf16, }; @@ -336,6 +337,7 @@ impl Project { client.subscribe_to_entity(remote_id, cx, Self::handle_save_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), client.subscribe_to_entity(remote_id, cx, Self::handle_format_buffer), + client.subscribe_to_entity(remote_id, cx, Self::handle_get_definition), ]); } } @@ -951,10 +953,10 @@ impl Project { cx: &mut ModelContext, ) -> Task>> { let source_buffer_handle = source_buffer_handle.clone(); - let buffer = source_buffer_handle.read(cx); + let source_buffer = source_buffer_handle.read(cx); let worktree; let buffer_abs_path; - if let Some(file) = File::from_dyn(buffer.file()) { + if let Some(file) = File::from_dyn(source_buffer.file()) { worktree = file.worktree.clone(); buffer_abs_path = file.as_local().map(|f| f.abs_path(cx)); } else { @@ -962,11 +964,11 @@ impl Project { }; if worktree.read(cx).as_local().is_some() { - let point = buffer.offset_to_point_utf16(position.to_offset(buffer)); + let point = source_buffer.offset_to_point_utf16(position.to_offset(source_buffer)); let buffer_abs_path = buffer_abs_path.unwrap(); let lang_name; let lang_server; - if let Some(lang) = buffer.language() { + if let Some(lang) = source_buffer.language() { lang_name = lang.name().to_string(); if let Some(server) = self .language_servers @@ -1061,9 +1063,67 @@ impl Project { Ok(definitions) }) + } else if let Some(project_id) = self.remote_id() { + let client = self.client.clone(); + let replica_id = self.replica_id(); + let request = proto::GetDefinition { + project_id, + buffer_id: source_buffer.remote_id(), + position: Some(serialize_anchor(&source_buffer.anchor_before(position))), + }; + cx.spawn(|this, mut cx| async move { + let response = client.request(request).await?; + this.update(&mut cx, |this, cx| { + let mut definitions = Vec::new(); + for definition in response.definitions { + let target_buffer = match definition + .buffer + .ok_or_else(|| anyhow!("missing buffer"))? + { + proto::definition::Buffer::Id(id) => this + .open_buffers + .get(&(id as usize)) + .and_then(|buffer| buffer.upgrade(cx)) + .ok_or_else(|| anyhow!("no buffer exists for id {}", id))?, + proto::definition::Buffer::State(mut buffer) => { + let file = if let Some(file) = buffer.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = + this.worktree_for_id(worktree_id, cx).ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + let file = File::from_proto(file, worktree.clone(), cx)?; + Some(Box::new(file) as Box) + } else { + None + }; + + let buffer = cx.add_model(|cx| { + Buffer::from_proto(replica_id, buffer, file, cx).unwrap() + }); + this.register_buffer(&buffer, &worktree, cx)?; + buffer + } + }; + let target_start = definition + .target_start + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target start"))?; + let target_end = definition + .target_end + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target end"))?; + definitions.push(Definition { + target_buffer, + target_range: target_start..target_end, + }) + } + + Ok(definitions) + }) + }) } else { - log::info!("go to definition is not yet implemented for guests"); - Task::ready(Ok(Default::default())) + Task::ready(Err(anyhow!("project does not have a remote id"))) } } @@ -1627,6 +1687,62 @@ impl Project { Ok(()) } + pub fn handle_get_definition( + &mut self, + envelope: TypedEnvelope, + rpc: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let receipt = envelope.receipt(); + let sender_id = envelope.original_sender_id()?; + let source_buffer = self + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + let position = envelope + .payload + .position + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("invalid position"))?; + if !source_buffer.read(cx).can_resolve(&position) { + return Err(anyhow!("cannot resolve position")); + } + + let definitions = self.definition(&source_buffer, position, cx); + cx.spawn(|this, mut cx| async move { + let definitions = definitions.await?; + let mut response = proto::GetDefinitionResponse { + definitions: Default::default(), + }; + this.update(&mut cx, |this, cx| { + for definition in definitions { + let buffer_id = definition.target_buffer.read(cx).remote_id(); + let shared_buffers = this.shared_buffers.entry(sender_id).or_default(); + let buffer = match shared_buffers.entry(buffer_id) { + hash_map::Entry::Occupied(_) => proto::definition::Buffer::Id(buffer_id), + hash_map::Entry::Vacant(entry) => { + entry.insert(definition.target_buffer.clone()); + proto::definition::Buffer::State( + definition.target_buffer.read(cx).to_proto(), + ) + } + }; + response.definitions.push(proto::Definition { + target_start: Some(serialize_anchor(&definition.target_range.start)), + target_end: Some(serialize_anchor(&definition.target_range.end)), + buffer: Some(buffer), + }); + } + }); + rpc.respond(receipt, response).await?; + Ok::<_, anyhow::Error>(()) + }) + .detach_and_log_err(cx); + + Ok(()) + } + pub fn handle_open_buffer( &mut self, envelope: TypedEnvelope, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cbe7ca5d1adf80c6844186efbccd2396a4f48884..059dab20662c1b42c61dbe3a3679609fcdddb06f 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -20,40 +20,42 @@ message Envelope { LeaveProject leave_project = 14; AddProjectCollaborator add_project_collaborator = 15; RemoveProjectCollaborator remove_project_collaborator = 16; - - RegisterWorktree register_worktree = 17; - UnregisterWorktree unregister_worktree = 18; - ShareWorktree share_worktree = 19; - UpdateWorktree update_worktree = 20; - UpdateDiagnosticSummary update_diagnostic_summary = 21; - DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 22; - DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 23; - - OpenBuffer open_buffer = 24; - OpenBufferResponse open_buffer_response = 25; - CloseBuffer close_buffer = 26; - UpdateBuffer update_buffer = 27; - UpdateBufferFile update_buffer_file = 28; - SaveBuffer save_buffer = 29; - BufferSaved buffer_saved = 30; - BufferReloaded buffer_reloaded = 31; - FormatBuffer format_buffer = 32; - - GetChannels get_channels = 33; - GetChannelsResponse get_channels_response = 34; - JoinChannel join_channel = 35; - JoinChannelResponse join_channel_response = 36; - LeaveChannel leave_channel = 37; - SendChannelMessage send_channel_message = 38; - SendChannelMessageResponse send_channel_message_response = 39; - ChannelMessageSent channel_message_sent = 40; - GetChannelMessages get_channel_messages = 41; - GetChannelMessagesResponse get_channel_messages_response = 42; - - UpdateContacts update_contacts = 43; - - GetUsers get_users = 44; - GetUsersResponse get_users_response = 45; + GetDefinition get_definition = 17; + GetDefinitionResponse get_definition_response = 18; + + RegisterWorktree register_worktree = 19; + UnregisterWorktree unregister_worktree = 20; + ShareWorktree share_worktree = 21; + UpdateWorktree update_worktree = 22; + UpdateDiagnosticSummary update_diagnostic_summary = 23; + DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 24; + DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 25; + + OpenBuffer open_buffer = 26; + OpenBufferResponse open_buffer_response = 27; + CloseBuffer close_buffer = 28; + UpdateBuffer update_buffer = 29; + UpdateBufferFile update_buffer_file = 30; + SaveBuffer save_buffer = 31; + BufferSaved buffer_saved = 32; + BufferReloaded buffer_reloaded = 33; + FormatBuffer format_buffer = 34; + + GetChannels get_channels = 35; + GetChannelsResponse get_channels_response = 36; + JoinChannel join_channel = 37; + JoinChannelResponse join_channel_response = 38; + LeaveChannel leave_channel = 39; + SendChannelMessage send_channel_message = 40; + SendChannelMessageResponse send_channel_message_response = 41; + ChannelMessageSent channel_message_sent = 42; + GetChannelMessages get_channel_messages = 43; + GetChannelMessagesResponse get_channel_messages_response = 44; + + UpdateContacts update_contacts = 45; + + GetUsers get_users = 46; + GetUsersResponse get_users_response = 47; } } @@ -134,6 +136,25 @@ message RemoveProjectCollaborator { uint32 peer_id = 2; } +message GetDefinition { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + } + +message GetDefinitionResponse { + repeated Definition definitions = 1; +} + +message Definition { + oneof buffer { + uint64 id = 1; + Buffer state = 2; + } + Anchor target_start = 3; + Anchor target_end = 4; +} + message OpenBuffer { uint64 project_id = 1; uint64 worktree_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index cebe3504e9b3ce99d554163fbee026b4e9f50a14..509cefd46c912d6310b727c5e3654ed7d1c712fc 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -134,6 +134,8 @@ messages!( GetChannelMessagesResponse, GetChannels, GetChannelsResponse, + GetDefinition, + GetDefinitionResponse, GetUsers, GetUsersResponse, JoinChannel, @@ -168,6 +170,7 @@ request_messages!( (FormatBuffer, Ack), (GetChannelMessages, GetChannelMessagesResponse), (GetChannels, GetChannelsResponse), + (GetDefinition, GetDefinitionResponse), (GetUsers, GetUsersResponse), (JoinChannel, JoinChannelResponse), (JoinProject, JoinProjectResponse), @@ -191,6 +194,7 @@ entity_messages!( DiskBasedDiagnosticsUpdated, DiskBasedDiagnosticsUpdating, FormatBuffer, + GetDefinition, JoinProject, LeaveProject, OpenBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index c8af5ce1fd934bc10cf7f378317ca2a407e91f93..89d74fa4a2db743156fd9e1c0fc70f4adc8a713d 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -17,7 +17,7 @@ use rpc::{ Connection, ConnectionId, Peer, TypedEnvelope, }; use sha1::{Digest as _, Sha1}; -use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant}; +use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant}; use store::{Store, Worktree}; use surf::StatusCode; use tide::log; @@ -74,6 +74,7 @@ impl Server { .add_handler(Server::update_diagnostic_summary) .add_handler(Server::disk_based_diagnostics_updating) .add_handler(Server::disk_based_diagnostics_updated) + .add_handler(Server::get_definition) .add_handler(Server::open_buffer) .add_handler(Server::close_buffer) .add_handler(Server::update_buffer) @@ -479,26 +480,40 @@ impl Server { .worktree .as_mut() .ok_or_else(|| anyhow!("missing worktree"))?; - let entries = mem::take(&mut worktree.entries) - .into_iter() - .map(|entry| (entry.id, entry)) + let entries = worktree + .entries + .iter() + .map(|entry| (entry.id, entry.clone())) .collect(); - - let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries) - .into_iter() - .map(|summary| (PathBuf::from(summary.path.clone()), summary)) + let diagnostic_summaries = worktree + .diagnostic_summaries + .iter() + .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone())) .collect(); - let contact_user_ids = self.state_mut().share_worktree( + let shared_worktree = self.state_mut().share_worktree( request.payload.project_id, worktree.id, request.sender_id, entries, diagnostic_summaries, ); - if let Some(contact_user_ids) = contact_user_ids { + if let Some(shared_worktree) = shared_worktree { + broadcast( + request.sender_id, + shared_worktree.connection_ids, + |connection_id| { + self.peer.forward_send( + request.sender_id, + connection_id, + request.payload.clone(), + ) + }, + ) + .await?; self.peer.respond(request.receipt(), proto::Ack {}).await?; - self.update_contacts_for_users(&contact_user_ids).await?; + self.update_contacts_for_users(&shared_worktree.authorized_user_ids) + .await?; } else { self.peer .respond_with_error( @@ -594,6 +609,24 @@ impl Server { Ok(()) } + async fn get_definition( + self: Arc, + request: TypedEnvelope, + ) -> tide::Result<()> { + let receipt = request.receipt(); + let host_connection_id = self + .state() + .read_project(request.payload.project_id, request.sender_id) + .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))? + .host_connection_id; + let response = self + .peer + .forward_request(request.sender_id, host_connection_id, request.payload) + .await?; + self.peer.respond(receipt, response).await?; + Ok(()) + } + async fn open_buffer( self: Arc, request: TypedEnvelope, @@ -1156,8 +1189,8 @@ mod tests { editor::{Editor, EditorSettings, Input, MultiBuffer}, fs::{FakeFs, Fs as _}, language::{ - tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, - LanguageRegistry, LanguageServerConfig, Point, + tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, + LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, }, lsp, project::{DiagnosticSummary, Project, ProjectPath}, @@ -2318,6 +2351,163 @@ mod tests { ); } + #[gpui::test] + async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { + cx_a.foreground().forbid_parking(); + let mut lang_registry = Arc::new(LanguageRegistry::new()); + let fs = Arc::new(FakeFs::new()); + fs.insert_tree( + "/root-1", + json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, + "a.rs": "const ONE: usize = b::TWO + b::THREE;", + }), + ) + .await; + fs.insert_tree( + "/root-2", + json!({ + "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;", + }), + ) + .await; + + // Set up a fake language server. + let (language_server_config, mut fake_language_server) = + LanguageServerConfig::fake(cx_a.background()).await; + Arc::get_mut(&mut lang_registry) + .unwrap() + .add(Arc::new(Language::new( + LanguageConfig { + name: "Rust".to_string(), + path_suffixes: vec!["rs".to_string()], + language_server: Some(language_server_config), + ..Default::default() + }, + Some(tree_sitter_rust::language()), + ))); + + // Connect to a server as 2 clients. + let mut server = TestServer::start(cx_a.foreground()).await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + + // Share a project as client A + let project_a = cx_a.update(|cx| { + Project::local( + client_a.clone(), + client_a.user_store.clone(), + lang_registry.clone(), + fs.clone(), + cx, + ) + }); + let (worktree_a, _) = project_a + .update(&mut cx_a, |p, cx| { + p.find_or_create_local_worktree("/root-1", false, cx) + }) + .await + .unwrap(); + worktree_a + .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) + .await; + let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await; + let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id()); + project_a + .update(&mut cx_a, |p, cx| p.share(cx)) + .await + .unwrap(); + + // Join the worktree as client B. + let project_b = Project::remote( + project_id, + client_b.clone(), + client_b.user_store.clone(), + lang_registry.clone(), + fs.clone(), + &mut cx_b.to_async(), + ) + .await + .unwrap(); + + // Open the file to be formatted on client B. + let buffer_b = cx_b + .background() + .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) + .await + .unwrap(); + + let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx)); + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root-2/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), + ))), + ) + .await; + let definitions_1 = definitions_1.await.unwrap(); + cx_b.read(|cx| { + assert_eq!(definitions_1.len(), 1); + assert_eq!(project_b.read(cx).worktrees(cx).count(), 2); + let target_buffer = definitions_1[0].target_buffer.read(cx); + assert_eq!( + target_buffer.text(), + "const TWO: usize = 2;\nconst THREE: usize = 3;" + ); + assert_eq!( + definitions_1[0].target_range.to_point(target_buffer), + Point::new(0, 6)..Point::new(0, 9) + ); + }); + + // Try getting more definitions for the same buffer, ensuring the buffer gets reused from + // the previous call to `definition`. + let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx)); + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root-2/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)), + ))), + ) + .await; + let definitions_2 = definitions_2.await.unwrap(); + cx_b.read(|cx| { + assert_eq!(definitions_2.len(), 1); + assert_eq!(project_b.read(cx).worktrees(cx).count(), 2); + let target_buffer = definitions_2[0].target_buffer.read(cx); + assert_eq!( + target_buffer.text(), + "const TWO: usize = 2;\nconst THREE: usize = 3;" + ); + assert_eq!( + definitions_2[0].target_range.to_point(target_buffer), + Point::new(1, 6)..Point::new(1, 11) + ); + }); + assert_eq!( + definitions_1[0].target_buffer, + definitions_2[0].target_buffer + ); + + cx_b.update(|_| { + drop(definitions_1); + drop(definitions_2); + }); + project_b + .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1) + .await; + } + #[gpui::test] async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index b7aec2689bd5b575ee335775f881356a8fa09207..6e11f431aca72c2ee44c255bdf36103cf2d1d744 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -74,6 +74,11 @@ pub struct LeftProject { pub authorized_user_ids: Vec, } +pub struct SharedWorktree { + pub authorized_user_ids: Vec, + pub connection_ids: Vec, +} + impl Store { pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { self.connections.insert( @@ -393,7 +398,7 @@ impl Store { connection_id: ConnectionId, entries: HashMap, diagnostic_summaries: BTreeMap, - ) -> Option> { + ) -> Option { let project = self.projects.get_mut(&project_id)?; let worktree = project.worktrees.get_mut(&worktree_id)?; if project.host_connection_id == connection_id && project.share.is_some() { @@ -401,7 +406,10 @@ impl Store { entries, diagnostic_summaries, }); - Some(project.authorized_user_ids()) + Some(SharedWorktree { + authorized_user_ids: project.authorized_user_ids(), + connection_ids: project.guest_connection_ids(), + }) } else { None } From a762f575f4cd5bc53e0a0172d5c4b6d4a57eef59 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 14:00:38 +0100 Subject: [PATCH 03/12] Add remote worktree to project before it is fully deserialized This prevents a race condition where the host will send us messages and responses about a worktree that we have seen but haven't yet finished loading. --- crates/project/src/project.rs | 38 +++++----- crates/project/src/worktree.rs | 124 +++++++++++++++++---------------- 2 files changed, 82 insertions(+), 80 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 761a9ca8ae0bb603e115d8872ef988b8ddc532d4..b24fa4caa8d97d66167fc5cb2120f2a17d99837e 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -247,8 +247,10 @@ impl Project { let mut worktrees = Vec::new(); for worktree in response.worktrees { - worktrees - .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?); + let (worktree, load_task) = cx + .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)); + worktrees.push(worktree); + load_task.detach(); } let user_ids = response @@ -1464,16 +1466,9 @@ impl Project { .payload .worktree .ok_or_else(|| anyhow!("invalid worktree"))?; - cx.spawn(|this, mut cx| { - async move { - let worktree = - Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?; - this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx)); - Ok(()) - } - .log_err() - }) - .detach(); + let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); + self.add_worktree(&worktree, cx); + load_task.detach(); Ok(()) } @@ -2551,15 +2546,16 @@ mod tests { // Create a remote copy of this worktree. let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot()); - let remote = Worktree::remote( - 1, - 1, - initial_snapshot.to_proto(&Default::default(), Default::default()), - rpc.clone(), - &mut cx.to_async(), - ) - .await - .unwrap(); + let (remote, load_task) = cx.update(|cx| { + Worktree::remote( + 1, + 1, + initial_snapshot.to_proto(&Default::default(), Default::default()), + rpc.clone(), + cx, + ) + }); + load_task.await; cx.read(|cx| { assert!(!buffer2.read(cx).is_dirty()); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index fd33386ffcc5793c2572ba7815d8c370ef19c822..3aba73c416777ccfa6c5f5e1aa49c98783b57a11 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -190,13 +190,13 @@ impl Worktree { Ok(tree) } - pub async fn remote( + pub fn remote( project_remote_id: u64, replica_id: ReplicaId, worktree: proto::Worktree, client: Arc, - cx: &mut AsyncAppContext, - ) -> Result> { + cx: &mut MutableAppContext, + ) -> (ModelHandle, Task<()>) { let remote_id = worktree.id; let root_char_bag: CharBag = worktree .root_name @@ -205,32 +205,26 @@ impl Worktree { .collect(); let root_name = worktree.root_name.clone(); let weak = worktree.weak; - let (entries_by_path, entries_by_id, diagnostic_summaries) = cx - .background() - .spawn(async move { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in worktree.entries { - match Entry::try_from((&root_char_bag, entry)) { - Ok(entry) => { - entries_by_id_edits.push(Edit::Insert(PathEntry { - id: entry.id, - path: entry.path.clone(), - is_ignored: entry.is_ignored, - scan_id: 0, - })); - entries_by_path_edits.push(Edit::Insert(entry)); - } - Err(err) => log::warn!("error for remote worktree entry {:?}", err), - } - } - - let mut entries_by_path = SumTree::new(); - let mut entries_by_id = SumTree::new(); - entries_by_path.edit(entries_by_path_edits, &()); - entries_by_id.edit(entries_by_id_edits, &()); + let snapshot = Snapshot { + id: WorktreeId(remote_id as usize), + root_name, + root_char_bag, + entries_by_path: Default::default(), + entries_by_id: Default::default(), + }; - let diagnostic_summaries = TreeMap::from_ordered_entries( + let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); + let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let worktree_handle = cx.add_model(|_: &mut ModelContext| { + Worktree::Remote(RemoteWorktree { + project_id: project_remote_id, + replica_id, + snapshot: snapshot.clone(), + snapshot_rx: snapshot_rx.clone(), + updates_tx, + client: client.clone(), + queued_operations: Default::default(), + diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { ( PathKey(PathBuf::from(summary.path).into()), @@ -242,24 +236,48 @@ impl Worktree { }, ) }), - ); - - (entries_by_path, entries_by_id, diagnostic_summaries) + ), + weak, }) - .await; + }); - let worktree = cx.update(|cx| { - cx.add_model(|cx: &mut ModelContext| { - let snapshot = Snapshot { - id: WorktreeId(remote_id as usize), - root_name, - root_char_bag, - entries_by_path, - entries_by_id, - }; + let deserialize_task = cx.spawn({ + let worktree_handle = worktree_handle.clone(); + |cx| async move { + let (entries_by_path, entries_by_id) = cx + .background() + .spawn(async move { + let mut entries_by_path_edits = Vec::new(); + let mut entries_by_id_edits = Vec::new(); + for entry in worktree.entries { + match Entry::try_from((&root_char_bag, entry)) { + Ok(entry) => { + entries_by_id_edits.push(Edit::Insert(PathEntry { + id: entry.id, + path: entry.path.clone(), + is_ignored: entry.is_ignored, + scan_id: 0, + })); + entries_by_path_edits.push(Edit::Insert(entry)); + } + Err(err) => log::warn!("error for remote worktree entry {:?}", err), + } + } - let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); - let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let mut entries_by_path = SumTree::new(); + let mut entries_by_id = SumTree::new(); + entries_by_path.edit(entries_by_path_edits, &()); + entries_by_id.edit(entries_by_id_edits, &()); + + (entries_by_path, entries_by_id) + }) + .await; + + { + let mut snapshot = snapshot_tx.borrow_mut(); + snapshot.entries_by_path = entries_by_path; + snapshot.entries_by_id = entries_by_id; + } cx.background() .spawn(async move { @@ -275,7 +293,8 @@ impl Worktree { { let mut snapshot_rx = snapshot_rx.clone(); - cx.spawn_weak(|this, mut cx| async move { + let this = worktree_handle.downgrade(); + cx.spawn(|mut cx| async move { while let Some(_) = snapshot_rx.recv().await { if let Some(this) = cx.read(|cx| this.upgrade(cx)) { this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); @@ -286,22 +305,9 @@ impl Worktree { }) .detach(); } - - Worktree::Remote(RemoteWorktree { - project_id: project_remote_id, - replica_id, - snapshot, - snapshot_rx, - updates_tx, - client: client.clone(), - queued_operations: Default::default(), - diagnostic_summaries, - weak, - }) - }) + } }); - - Ok(worktree) + (worktree_handle, deserialize_task) } pub fn as_local(&self) -> Option<&LocalWorktree> { From dabab6e323c68a6d5905a95398492949881ba1a8 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 15:08:04 +0100 Subject: [PATCH 04/12] Verify simultaneously opening buffers via `definition` and `open_buffer` This fails because we don't yet handle this scenario. --- crates/server/src/rpc.rs | 106 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 89d74fa4a2db743156fd9e1c0fc70f4adc8a713d..88a23952d4caf6996e4f629ce6c50701543d0bcb 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -2318,7 +2318,6 @@ mod tests { .await .unwrap(); - // Open the file to be formatted on client B. let buffer_b = cx_b .background() .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) @@ -2508,6 +2507,111 @@ mod tests { .await; } + #[gpui::test(iterations = 100, seed = 1)] + async fn test_open_buffer_while_getting_definition_pointing_to_it( + mut cx_a: TestAppContext, + mut cx_b: TestAppContext, + ) { + cx_a.foreground().forbid_parking(); + let mut lang_registry = Arc::new(LanguageRegistry::new()); + let fs = Arc::new(FakeFs::new()); + fs.insert_tree( + "/root", + json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, + "a.rs": "const ONE: usize = b::TWO;", + "b.rs": "const TWO: usize = 2", + }), + ) + .await; + + // Set up a fake language server. + let (language_server_config, mut fake_language_server) = + LanguageServerConfig::fake(cx_a.background()).await; + Arc::get_mut(&mut lang_registry) + .unwrap() + .add(Arc::new(Language::new( + LanguageConfig { + name: "Rust".to_string(), + path_suffixes: vec!["rs".to_string()], + language_server: Some(language_server_config), + ..Default::default() + }, + Some(tree_sitter_rust::language()), + ))); + + // Connect to a server as 2 clients. + let mut server = TestServer::start(cx_a.foreground()).await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + + // Share a project as client A + let project_a = cx_a.update(|cx| { + Project::local( + client_a.clone(), + client_a.user_store.clone(), + lang_registry.clone(), + fs.clone(), + cx, + ) + }); + let (worktree_a, _) = project_a + .update(&mut cx_a, |p, cx| { + p.find_or_create_local_worktree("/root", false, cx) + }) + .await + .unwrap(); + worktree_a + .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) + .await; + let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await; + let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id()); + project_a + .update(&mut cx_a, |p, cx| p.share(cx)) + .await + .unwrap(); + + // Join the worktree as client B. + let project_b = Project::remote( + project_id, + client_b.clone(), + client_b.user_store.clone(), + lang_registry.clone(), + fs.clone(), + &mut cx_b.to_async(), + ) + .await + .unwrap(); + + let buffer_b1 = cx_b + .background() + .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) + .await + .unwrap(); + + let definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), + ))), + ) + .await; + + let buffer_b2 = project_b + .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)) + .await + .unwrap(); + let definitions = definitions.await.unwrap(); + assert_eq!(definitions.len(), 1); + assert_eq!(definitions[0].target_buffer, buffer_b2); + } + #[gpui::test] async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); From 93125cbd1639140251e7b03c61a146b7023c120d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 16:14:30 +0100 Subject: [PATCH 05/12] Remove `executor::Foreground::Test` All code paths have already transitioned to `Foreground::Deterministic`. --- crates/gpui/src/executor.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 84efd2c6e0b6b3ee9d81736821b27f581a460e10..e939008bb301cadde693dfb419514e6663a21117 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -33,7 +33,6 @@ pub enum Foreground { dispatcher: Arc, _not_send_or_sync: PhantomData>, }, - Test(smol::LocalExecutor<'static>), Deterministic(Arc), } @@ -438,10 +437,6 @@ impl Foreground { } } - pub fn test() -> Self { - Self::Test(smol::LocalExecutor::new()) - } - pub fn spawn(&self, future: impl Future + 'static) -> Task { let future = any_local_future(future); let any_task = match self { @@ -460,7 +455,6 @@ impl Foreground { } spawn_inner(future, dispatcher) } - Self::Test(executor) => executor.spawn(future), }; Task::local(any_task) } @@ -470,7 +464,6 @@ impl Foreground { let any_value = match self { Self::Deterministic(executor) => executor.run(future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), - Self::Test(executor) => smol::block_on(executor.run(future)), }; *any_value.downcast().unwrap() } From 96b66dcce1a86fb41a7a8468cfbe6ec0b38c90e5 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 18:47:41 +0100 Subject: [PATCH 06/12] Fix race condition when opening a buffer and getting a definition to it --- crates/language/src/buffer.rs | 6 +- crates/language/src/proto.rs | 2 +- crates/project/src/project.rs | 185 ++++++++++++++++++++++----------- crates/project/src/worktree.rs | 46 -------- crates/rpc/proto/zed.proto | 16 +-- crates/rpc/src/peer.rs | 18 +--- crates/server/src/rpc.rs | 22 ++-- 7 files changed, 160 insertions(+), 135 deletions(-) diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index bd361e9731d2dc1bd8f9bb78b29e16ae992cbac1..eae15fe5af5c7dd524e8fa6ba6cb1ac84cf7fdf7 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -305,7 +305,7 @@ impl Buffer { pub fn from_proto( replica_id: ReplicaId, - message: proto::Buffer, + message: proto::BufferState, file: Option>, cx: &mut ModelContext, ) -> Result { @@ -359,8 +359,8 @@ impl Buffer { Ok(this) } - pub fn to_proto(&self) -> proto::Buffer { - proto::Buffer { + pub fn to_proto(&self) -> proto::BufferState { + proto::BufferState { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), visible_text: self.text.text(), diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index 4bfc2ae4332edb0fe790559c838f4e4f874b479b..0f9ee69956910083f9145955ddba37cd47174749 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -7,7 +7,7 @@ use rpc::proto; use std::sync::Arc; use text::*; -pub use proto::{Buffer, SelectionSet}; +pub use proto::{Buffer, BufferState, SelectionSet}; pub fn serialize_operation(operation: &Operation) -> proto::Operation { proto::Operation { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index b24fa4caa8d97d66167fc5cb2120f2a17d99837e..d6d842d80ed90c5e8128f65c0e435aae37383b0f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -518,17 +518,18 @@ impl Project { let (mut tx, rx) = postage::watch::channel(); entry.insert(rx.clone()); - let load_buffer = worktree.update(cx, |worktree, cx| { - worktree.load_buffer(&project_path.path, cx) - }); + let load_buffer = if worktree.read(cx).is_local() { + self.open_local_buffer(&project_path.path, &worktree, cx) + } else { + self.open_remote_buffer(&project_path.path, &worktree, cx) + }; cx.spawn(move |this, mut cx| async move { let load_result = load_buffer.await; - *tx.borrow_mut() = Some(this.update(&mut cx, |this, cx| { + *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { // Record the fact that the buffer is no longer loading. this.loading_buffers.remove(&project_path); let buffer = load_result.map_err(Arc::new)?; - this.register_buffer(&buffer, &worktree, cx)?; Ok(buffer) })); }) @@ -550,6 +551,55 @@ impl Project { }) } + fn open_local_buffer( + &mut self, + path: &Arc, + worktree: &ModelHandle, + cx: &mut ModelContext, + ) -> Task>> { + let load_buffer = worktree.update(cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + worktree.load_buffer(path, cx) + }); + let worktree = worktree.downgrade(); + cx.spawn(|this, mut cx| async move { + let buffer = load_buffer.await?; + let worktree = worktree + .upgrade(&cx) + .ok_or_else(|| anyhow!("worktree was removed"))?; + this.update(&mut cx, |this, cx| { + this.register_buffer(&buffer, Some(&worktree), cx) + })?; + Ok(buffer) + }) + } + + fn open_remote_buffer( + &mut self, + path: &Arc, + worktree: &ModelHandle, + cx: &mut ModelContext, + ) -> Task>> { + let rpc = self.client.clone(); + let project_id = self.remote_id().unwrap(); + let remote_worktree_id = worktree.read(cx).id(); + let path = path.clone(); + let path_string = path.to_string_lossy().to_string(); + cx.spawn(|this, mut cx| async move { + let response = rpc + .request(proto::OpenBuffer { + project_id, + worktree_id: remote_worktree_id.to_proto(), + path: path_string, + }) + .await?; + let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; + this.update(&mut cx, |this, cx| { + this.deserialize_remote_buffer(buffer, cx) + }) + }) + } + pub fn save_buffer_as( &self, buffer: ModelHandle, @@ -568,7 +618,7 @@ impl Project { }) .await?; this.update(&mut cx, |this, cx| { - this.assign_language_to_buffer(&buffer, &worktree, cx); + this.assign_language_to_buffer(&buffer, Some(&worktree), cx); }); Ok(()) }) @@ -619,7 +669,7 @@ impl Project { fn register_buffer( &mut self, buffer: &ModelHandle, - worktree: &ModelHandle, + worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Result<()> { match self.open_buffers.insert( @@ -627,21 +677,21 @@ impl Project { OpenBuffer::Loaded(buffer.downgrade()), ) { Some(OpenBuffer::Operations(pending_ops)) => { - buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; + // buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; } Some(OpenBuffer::Loaded(_)) => { return Err(anyhow!("registered the same buffer twice")); } None => {} } - self.assign_language_to_buffer(&buffer, &worktree, cx); + self.assign_language_to_buffer(&buffer, worktree, cx); Ok(()) } fn assign_language_to_buffer( &mut self, buffer: &ModelHandle, - worktree: &ModelHandle, + worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Option<()> { let (path, full_path) = { @@ -657,7 +707,7 @@ impl Project { // For local worktrees, start a language server if needed. // Also assign the language server and any previously stored diagnostics to the buffer. - if let Some(local_worktree) = worktree.read(cx).as_local() { + if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) { let worktree_id = local_worktree.id(); let worktree_abs_path = local_worktree.abs_path().clone(); @@ -681,7 +731,7 @@ impl Project { } } - if let Some(local_worktree) = worktree.read(cx).as_local() { + if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) { if let Some(diagnostics) = local_worktree.diagnostics_for_path(&path) { buffer.update(cx, |buffer, cx| { buffer.update_diagnostics(None, diagnostics, cx).log_err(); @@ -1067,7 +1117,6 @@ impl Project { }) } else if let Some(project_id) = self.remote_id() { let client = self.client.clone(); - let replica_id = self.replica_id(); let request = proto::GetDefinition { project_id, buffer_id: source_buffer.remote_id(), @@ -1078,35 +1127,10 @@ impl Project { this.update(&mut cx, |this, cx| { let mut definitions = Vec::new(); for definition in response.definitions { - let target_buffer = match definition - .buffer - .ok_or_else(|| anyhow!("missing buffer"))? - { - proto::definition::Buffer::Id(id) => this - .open_buffers - .get(&(id as usize)) - .and_then(|buffer| buffer.upgrade(cx)) - .ok_or_else(|| anyhow!("no buffer exists for id {}", id))?, - proto::definition::Buffer::State(mut buffer) => { - let file = if let Some(file) = buffer.file.take() { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = - this.worktree_for_id(worktree_id, cx).ok_or_else(|| { - anyhow!("no worktree found for id {}", file.worktree_id) - })?; - let file = File::from_proto(file, worktree.clone(), cx)?; - Some(Box::new(file) as Box) - } else { - None - }; - - let buffer = cx.add_model(|cx| { - Buffer::from_proto(replica_id, buffer, file, cx).unwrap() - }); - this.register_buffer(&buffer, &worktree, cx)?; - buffer - } - }; + let target_buffer = this.deserialize_remote_buffer( + definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?, + cx, + )?; let target_start = definition .target_start .and_then(deserialize_anchor) @@ -1712,17 +1736,8 @@ impl Project { }; this.update(&mut cx, |this, cx| { for definition in definitions { - let buffer_id = definition.target_buffer.read(cx).remote_id(); - let shared_buffers = this.shared_buffers.entry(sender_id).or_default(); - let buffer = match shared_buffers.entry(buffer_id) { - hash_map::Entry::Occupied(_) => proto::definition::Buffer::Id(buffer_id), - hash_map::Entry::Vacant(entry) => { - entry.insert(definition.target_buffer.clone()); - proto::definition::Buffer::State( - definition.target_buffer.read(cx).to_proto(), - ) - } - }; + let buffer = + this.serialize_buffer_for_peer(&definition.target_buffer, sender_id, cx); response.definitions.push(proto::Definition { target_start: Some(serialize_anchor(&definition.target_range.start)), target_end: Some(serialize_anchor(&definition.target_range.end)), @@ -1757,17 +1772,13 @@ impl Project { cx.spawn(|this, mut cx| { async move { let buffer = open_buffer.await?; - this.update(&mut cx, |this, _| { - this.shared_buffers - .entry(peer_id) - .or_default() - .insert(buffer.id() as u64, buffer.clone()); + let buffer = this.update(&mut cx, |this, cx| { + this.serialize_buffer_for_peer(&buffer, peer_id, cx) }); - let message = buffer.read_with(&cx, |buffer, _| buffer.to_proto()); rpc.respond( receipt, proto::OpenBufferResponse { - buffer: Some(message), + buffer: Some(buffer), }, ) .await @@ -1778,6 +1789,60 @@ impl Project { Ok(()) } + fn serialize_buffer_for_peer( + &mut self, + buffer: &ModelHandle, + peer_id: PeerId, + cx: &AppContext, + ) -> proto::Buffer { + let buffer_id = buffer.read(cx).remote_id(); + let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); + match shared_buffers.entry(buffer_id) { + hash_map::Entry::Occupied(_) => proto::Buffer { + variant: Some(proto::buffer::Variant::Id(buffer_id)), + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(buffer.clone()); + proto::Buffer { + variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())), + } + } + } + } + + fn deserialize_remote_buffer( + &mut self, + buffer: proto::Buffer, + cx: &mut ModelContext, + ) -> Result> { + match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { + proto::buffer::Variant::Id(id) => self + .open_buffers + .get(&(id as usize)) + .and_then(|buffer| buffer.upgrade(cx)) + .ok_or_else(|| anyhow!("no buffer exists for id {}", id)), + proto::buffer::Variant::State(mut buffer) => { + let mut buffer_worktree = None; + let mut buffer_file = None; + if let Some(file) = buffer.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = self + .worktree_for_id(worktree_id, cx) + .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?; + buffer_file = Some(Box::new(File::from_proto(file, worktree.clone(), cx)?) + as Box); + buffer_worktree = Some(worktree); + } + + let buffer = cx.add_model(|cx| { + Buffer::from_proto(self.replica_id(), buffer, buffer_file, cx).unwrap() + }); + self.register_buffer(&buffer, buffer_worktree.as_ref(), cx)?; + Ok(buffer) + } + } + } + pub fn handle_close_buffer( &mut self, envelope: TypedEnvelope, diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3aba73c416777ccfa6c5f5e1aa49c98783b57a11..15e4bed2bf2bafbeb6536eb56e4ce781c46190e1 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -367,17 +367,6 @@ impl Worktree { } } - pub fn load_buffer( - &mut self, - path: &Path, - cx: &mut ModelContext, - ) -> Task>> { - match self { - Worktree::Local(worktree) => worktree.load_buffer(path, cx), - Worktree::Remote(worktree) => worktree.load_buffer(path, cx), - } - } - pub fn diagnostic_summaries<'a>( &'a self, ) -> impl Iterator, DiagnosticSummary)> + 'a { @@ -834,41 +823,6 @@ impl LocalWorktree { } impl RemoteWorktree { - pub(crate) fn load_buffer( - &mut self, - path: &Path, - cx: &mut ModelContext, - ) -> Task>> { - let rpc = self.client.clone(); - let replica_id = self.replica_id; - let project_id = self.project_id; - let remote_worktree_id = self.id(); - let path: Arc = Arc::from(path); - let path_string = path.to_string_lossy().to_string(); - cx.spawn_weak(move |this, mut cx| async move { - let response = rpc - .request(proto::OpenBuffer { - project_id, - worktree_id: remote_worktree_id.to_proto(), - path: path_string, - }) - .await?; - - let this = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("worktree was closed"))?; - let mut remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; - let file = remote_buffer - .file - .take() - .map(|proto| cx.read(|cx| File::from_proto(proto, this.clone(), cx))) - .transpose()? - .map(|file| Box::new(file) as Box); - - Ok(cx.add_model(|cx| Buffer::from_proto(replica_id, remote_buffer, file, cx).unwrap())) - }) - } - fn snapshot(&self) -> Snapshot { self.snapshot.clone() } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 059dab20662c1b42c61dbe3a3679609fcdddb06f..5580d2a724a3aa1d74bbe10cb1905f4ae6861075 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -147,12 +147,9 @@ message GetDefinitionResponse { } message Definition { - oneof buffer { - uint64 id = 1; - Buffer state = 2; - } - Anchor target_start = 3; - Anchor target_end = 4; + Buffer buffer = 1; + Anchor target_start = 2; + Anchor target_end = 3; } message OpenBuffer { @@ -324,6 +321,13 @@ message Entry { } message Buffer { + oneof variant { + uint64 id = 1; + BufferState state = 2; + } +} + +message BufferState { uint64 id = 1; optional File file = 2; string visible_text = 3; diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index ce9680173311ecb42dfef999c6fef7dee09e606f..867ed91ab3e4b05435784d1fbbe9de375a98ac93 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -410,9 +410,7 @@ mod tests { .unwrap(), proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(0)) }), } ); @@ -431,10 +429,8 @@ mod tests { .unwrap(), proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), + variant: Some(proto::buffer::Variant::Id(1)) + }) } ); @@ -460,9 +456,7 @@ mod tests { assert_eq!(message.worktree_id, 1); proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(0)), }), } } @@ -470,9 +464,7 @@ mod tests { assert_eq!(message.worktree_id, 2); proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(1)), }), } } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 88a23952d4caf6996e4f629ce6c50701543d0bcb..6a01950c6c2360db8682167c74d1578d61ff651f 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1168,6 +1168,7 @@ mod tests { use gpui::{executor, ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, watch}; + use rand::prelude::*; use rpc::PeerId; use serde_json::json; use sqlx::types::time::OffsetDateTime; @@ -2507,10 +2508,11 @@ mod tests { .await; } - #[gpui::test(iterations = 100, seed = 1)] + #[gpui::test] async fn test_open_buffer_while_getting_definition_pointing_to_it( mut cx_a: TestAppContext, mut cx_b: TestAppContext, + mut rng: StdRng, ) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); @@ -2589,7 +2591,18 @@ mod tests { .await .unwrap(); - let definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + let definitions; + let buffer_b2; + if rng.gen() { + definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + buffer_b2 = + project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)); + } else { + buffer_b2 = + project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)); + definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + } + let (request_id, _) = fake_language_server .receive_request::() .await; @@ -2603,10 +2616,7 @@ mod tests { ) .await; - let buffer_b2 = project_b - .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)) - .await - .unwrap(); + let buffer_b2 = buffer_b2.await.unwrap(); let definitions = definitions.await.unwrap(); assert_eq!(definitions.len(), 1); assert_eq!(definitions[0].target_buffer, buffer_b2); From b7314ef2aa863f5a7ab209828dc91017a3a38a50 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 18:45:14 +0100 Subject: [PATCH 07/12] WIP: Start restructuring executor --- Cargo.lock | 1 + crates/gpui/Cargo.toml | 4 +- crates/gpui/src/executor.rs | 315 ++++++++++---------------- crates/gpui/src/test.rs | 15 +- crates/gpui_macros/src/gpui_macros.rs | 8 +- crates/server/src/rpc.rs | 2 +- 6 files changed, 135 insertions(+), 210 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29c2228e7d31dc70d08b61e57e05f43c4dd726a9..dc7b6921a7ab67126cfbeab322582b9ded3d6f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,7 @@ dependencies = [ "block", "cc", "cocoa", + "collections", "core-foundation", "core-graphics", "core-text", diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index 4414936c9e3b81c8e46cd02f04f321007daab374..197a5ca12eb4bd79e86423c78c2107acbe828b09 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -8,9 +8,10 @@ version = "0.1.0" path = "src/gpui.rs" [features] -test-support = ["env_logger"] +test-support = ["env_logger", "collections/test-support"] [dependencies] +collections = { path = "../collections" } gpui_macros = { path = "../gpui_macros" } sum_tree = { path = "../sum_tree" } async-task = "4.0.3" @@ -47,6 +48,7 @@ bindgen = "0.58.1" cc = "1.0.67" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } env_logger = "0.8" png = "0.16" simplelog = "0.9" diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index e939008bb301cadde693dfb419514e6663a21117..7133aa5de8328370f1ce65c142e10a16862b15b4 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use async_task::Runnable; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; +use collections::HashMap; use parking_lot::Mutex; use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; @@ -33,7 +34,10 @@ pub enum Foreground { dispatcher: Arc, _not_send_or_sync: PhantomData>, }, - Deterministic(Arc), + Deterministic { + cx_id: usize, + executor: Arc, + }, } pub enum Background { @@ -69,9 +73,8 @@ unsafe impl Send for Task {} struct DeterministicState { rng: StdRng, seed: u64, - scheduled_from_foreground: Vec<(Runnable, Backtrace)>, - scheduled_from_background: Vec<(Runnable, Backtrace)>, - spawned_from_foreground: Vec<(Runnable, Backtrace)>, + scheduled_from_foreground: HashMap>, + scheduled_from_background: Vec, forbid_parking: bool, block_on_ticks: RangeInclusive, now: Instant, @@ -79,20 +82,24 @@ struct DeterministicState { waiting_backtrace: Option, } +enum ScheduledForeground { + MainFuture, + Runnable(Runnable), +} + pub struct Deterministic { state: Arc>, parker: Mutex, } impl Deterministic { - fn new(seed: u64) -> Self { - Self { + pub fn new(seed: u64) -> Arc { + Arc::new(Self { state: Arc::new(Mutex::new(DeterministicState { rng: StdRng::seed_from_u64(seed), seed, scheduled_from_foreground: Default::default(), scheduled_from_background: Default::default(), - spawned_from_foreground: Default::default(), forbid_parking: false, block_on_ticks: 0..=1000, now: Instant::now(), @@ -100,22 +107,32 @@ impl Deterministic { waiting_backtrace: None, })), parker: Default::default(), - } + }) + } + + pub fn build_background(self: &Arc) -> Arc { + Arc::new(Background::Deterministic { + executor: self.clone(), + }) + } + + pub fn build_foreground(self: &Arc, id: usize) -> Rc { + Rc::new(Foreground::Deterministic { + cx_id: id, + executor: self.clone(), + }) } - fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask { - let backtrace = Backtrace::new_unresolved(); - let scheduled_once = AtomicBool::new(false); + fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask { let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { let mut state = state.lock(); - let backtrace = backtrace.clone(); - if scheduled_once.fetch_or(true, SeqCst) { - state.scheduled_from_foreground.push((runnable, backtrace)); - } else { - state.spawned_from_foreground.push((runnable, backtrace)); - } + state + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .push(ScheduledForeground::Runnable(runnable)); unparker.unpark(); }); runnable.schedule(); @@ -123,24 +140,21 @@ impl Deterministic { } fn spawn(&self, future: AnyFuture) -> AnyTask { - let backtrace = Backtrace::new_unresolved(); let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn(future, move |runnable| { let mut state = state.lock(); - state - .scheduled_from_background - .push((runnable, backtrace.clone())); + state.scheduled_from_background.push(runnable); unparker.unpark(); }); runnable.schedule(); task } - fn run(&self, mut future: AnyLocalFuture) -> Box { + fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box { let woken = Arc::new(AtomicBool::new(false)); loop { - if let Some(result) = self.run_internal(woken.clone(), &mut future) { + if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) { return result; } @@ -153,67 +167,92 @@ impl Deterministic { } } - fn run_until_parked(&self) { + fn run_until_parked(&self, cx_id: usize) { let woken = Arc::new(AtomicBool::new(false)); let mut future = any_local_future(std::future::pending::<()>()); - self.run_internal(woken, &mut future); + self.run_internal(cx_id, woken, &mut future); } fn run_internal( &self, + cx_id: usize, woken: Arc, future: &mut AnyLocalFuture, ) -> Option> { let unparker = self.parker.lock().unparker(); - let waker = waker_fn(move || { - woken.store(true, SeqCst); - unparker.unpark(); + let scheduled_main_future = Arc::new(AtomicBool::new(true)); + self.state + .lock() + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .insert(0, ScheduledForeground::MainFuture); + + let waker = waker_fn({ + let state = self.state.clone(); + let scheduled_main_future = scheduled_main_future.clone(); + move || { + woken.store(true, SeqCst); + if !scheduled_main_future.load(SeqCst) { + scheduled_main_future.store(true, SeqCst); + state + .lock() + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .push(ScheduledForeground::MainFuture); + } + + unparker.unpark(); + } }); let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); loop { let mut state = self.state.lock(); - let runnable_count = state.scheduled_from_foreground.len() - + state.scheduled_from_background.len() - + state.spawned_from_foreground.len(); - let ix = state.rng.gen_range(0..=runnable_count); - if ix < state.scheduled_from_foreground.len() { - let (_, backtrace) = &state.scheduled_from_foreground[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_foreground.remove(ix).0; - drop(state); - runnable.run(); - } else if ix - state.scheduled_from_foreground.len() - < state.scheduled_from_background.len() + if state.scheduled_from_foreground.is_empty() + && state.scheduled_from_background.is_empty() { - let ix = ix - state.scheduled_from_foreground.len(); - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; - drop(state); - runnable.run(); - } else if ix < runnable_count { - let (_, backtrace) = &state.spawned_from_foreground[0]; - trace.record(&state, backtrace.clone()); - let runnable = state.spawned_from_foreground.remove(0).0; + return None; + } + + if !state.scheduled_from_background.is_empty() && state.rng.gen() { + let background_len = state.scheduled_from_background.len(); + let ix = state.rng.gen_range(0..background_len); + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); - } else { - drop(state); - if let Poll::Ready(result) = future.poll(&mut cx) { - return Some(result); + } else if !state.scheduled_from_foreground.is_empty() { + let available_cx_ids = state + .scheduled_from_foreground + .keys() + .copied() + .collect::>(); + let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap(); + let scheduled_from_cx = state + .scheduled_from_foreground + .get_mut(&cx_id_to_run) + .unwrap(); + let runnable = scheduled_from_cx.remove(0); + if scheduled_from_cx.is_empty() { + state.scheduled_from_foreground.remove(&cx_id_to_run); } - let state = self.state.lock(); - - if state.scheduled_from_foreground.is_empty() - && state.scheduled_from_background.is_empty() - && state.spawned_from_foreground.is_empty() - { - return None; + drop(state); + match runnable { + ScheduledForeground::MainFuture => { + scheduled_main_future.store(false, SeqCst); + if let Poll::Ready(result) = future.poll(&mut cx) { + return Some(result); + } + } + ScheduledForeground::Runnable(runnable) => { + runnable.run(); + } } + } else { + return None; } } } @@ -230,15 +269,12 @@ impl Deterministic { }; let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); for _ in 0..max_ticks { let mut state = self.state.lock(); let runnable_count = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled_from_background.len() { - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); } else { @@ -281,69 +317,13 @@ impl DeterministicState { } } -#[derive(Default)] -struct Trace { - executed: Vec, - scheduled: Vec>, - spawned_from_foreground: Vec>, -} - -impl Trace { - fn record(&mut self, state: &DeterministicState, executed: Backtrace) { - self.scheduled.push( - state - .scheduled_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.spawned_from_foreground.push( - state - .spawned_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.executed.push(executed); - } - - fn resolve(&mut self) { - for backtrace in &mut self.executed { - backtrace.resolve(); - } - - for backtraces in &mut self.scheduled { - for backtrace in backtraces { - backtrace.resolve(); - } - } - - for backtraces in &mut self.spawned_from_foreground { - for backtrace in backtraces { - backtrace.resolve(); - } - } - } -} - struct CwdBacktrace<'a> { backtrace: &'a Backtrace, - first_frame_only: bool, } impl<'a> CwdBacktrace<'a> { fn new(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: false, - } - } - - fn first_frame(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: true, - } + Self { backtrace } } } @@ -362,69 +342,12 @@ impl<'a> Debug for CwdBacktrace<'a> { .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd))) { formatted_frame.backtrace_frame(frame)?; - if self.first_frame_only { - break; - } } } fmt.finish() } } -impl Debug for Trace { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for ((backtrace, scheduled), spawned_from_foreground) in self - .executed - .iter() - .zip(&self.scheduled) - .zip(&self.spawned_from_foreground) - { - writeln!(f, "Scheduled")?; - for backtrace in scheduled { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if scheduled.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Spawned from foreground")?; - for backtrace in spawned_from_foreground { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if spawned_from_foreground.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?; - writeln!(f, "+++++++++++++++++++")?; - } - - Ok(()) - } -} - -impl Drop for Trace { - fn drop(&mut self) { - let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") { - trace_on_panic == "1" || trace_on_panic == "true" - } else { - false - }; - let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") { - trace_always == "1" || trace_always == "true" - } else { - false - }; - - if trace_always || (trace_on_panic && thread::panicking()) { - self.resolve(); - dbg!(self); - } - } -} - impl Foreground { pub fn platform(dispatcher: Arc) -> Result { if dispatcher.is_main_thread() { @@ -440,7 +363,9 @@ impl Foreground { pub fn spawn(&self, future: impl Future + 'static) -> Task { let future = any_local_future(future); let any_task = match self { - Self::Deterministic(executor) => executor.spawn_from_foreground(future), + Self::Deterministic { cx_id, executor } => { + executor.spawn_from_foreground(*cx_id, future) + } Self::Platform { dispatcher, .. } => { fn spawn_inner( future: AnyLocalFuture, @@ -462,7 +387,7 @@ impl Foreground { pub fn run(&self, future: impl 'static + Future) -> T { let future = any_local_future(future); let any_value = match self { - Self::Deterministic(executor) => executor.run(future), + Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), }; *any_value.downcast().unwrap() @@ -470,14 +395,14 @@ impl Foreground { pub fn parking_forbidden(&self) -> bool { match self { - Self::Deterministic(executor) => executor.state.lock().forbid_parking, + Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking, _ => panic!("this method can only be called on a deterministic executor"), } } pub fn start_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved()); } _ => panic!("this method can only be called on a deterministic executor"), @@ -486,7 +411,7 @@ impl Foreground { pub fn finish_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace.take(); } _ => panic!("this method can only be called on a deterministic executor"), @@ -495,7 +420,7 @@ impl Foreground { pub fn forbid_parking(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let mut state = executor.state.lock(); state.forbid_parking = true; state.rng = StdRng::seed_from_u64(state.seed); @@ -506,7 +431,7 @@ impl Foreground { pub async fn timer(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let (tx, mut rx) = barrier::channel(); { let mut state = executor.state.lock(); @@ -523,8 +448,8 @@ impl Foreground { pub fn advance_clock(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { - executor.run_until_parked(); + Self::Deterministic { cx_id, executor } => { + executor.run_until_parked(*cx_id); let mut state = executor.state.lock(); state.now += duration; @@ -541,7 +466,7 @@ impl Foreground { pub fn set_block_on_ticks(&self, range: RangeInclusive) { match self { - Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range, + Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range, _ => panic!("this method can only be called on a deterministic executor"), } } @@ -579,7 +504,7 @@ impl Background { let future = any_future(future); let any_task = match self { Self::Production { executor, .. } => executor.spawn(future), - Self::Deterministic { executor, .. } => executor.spawn(future), + Self::Deterministic { executor } => executor.spawn(future), }; Task::send(any_task) } @@ -646,14 +571,6 @@ impl<'a> Scope<'a> { } } -pub fn deterministic(seed: u64) -> (Rc, Arc) { - let executor = Arc::new(Deterministic::new(seed)); - ( - Rc::new(Foreground::Deterministic(executor.clone())), - Arc::new(Background::Deterministic { executor }), - ) -} - impl Task { pub fn ready(value: T) -> Self { Self::Ready(Some(value)) diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index ef95ea435ac34ea31d53d37d61e169094b3efde5..b4b4e621ac313828da5c8a9185d43a501986bd30 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -28,7 +28,12 @@ pub fn run_test( mut starting_seed: u64, max_retries: usize, test_fn: &mut (dyn RefUnwindSafe - + Fn(&mut MutableAppContext, Rc, u64)), + + Fn( + &mut MutableAppContext, + Rc, + Arc, + u64, + )), ) { let is_randomized = num_iterations > 1; if is_randomized { @@ -60,16 +65,16 @@ pub fn run_test( dbg!(seed); } - let (foreground, background) = executor::deterministic(seed); + let deterministic = executor::Deterministic::new(seed); let mut cx = TestAppContext::new( foreground_platform.clone(), platform.clone(), - foreground.clone(), - background.clone(), + deterministic.build_foreground(usize::MAX), + deterministic.build_background(), font_cache.clone(), 0, ); - cx.update(|cx| test_fn(cx, foreground_platform.clone(), seed)); + cx.update(|cx| test_fn(cx, foreground_platform.clone(), deterministic, seed)); atomic_seed.fetch_add(1, SeqCst); } diff --git a/crates/gpui_macros/src/gpui_macros.rs b/crates/gpui_macros/src/gpui_macros.rs index e94318172ea4d4407e5a5b0edf5041d749b068d4..21d978d9fb3e56a6a212ee8d5123cf095266f452 100644 --- a/crates/gpui_macros/src/gpui_macros.rs +++ b/crates/gpui_macros/src/gpui_macros.rs @@ -77,8 +77,8 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #namespace::TestAppContext::new( foreground_platform.clone(), cx.platform().clone(), - cx.foreground().clone(), - cx.background().clone(), + deterministic.build_foreground(#ix), + deterministic.build_background(), cx.font_cache().clone(), #first_entity_id, ), @@ -115,7 +115,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, foreground_platform, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) + &mut |cx, foreground_platform, deterministic, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) ); } } @@ -147,7 +147,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, _, seed| #inner_fn_name(#inner_fn_args) + &mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args) ); } } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 6a01950c6c2360db8682167c74d1578d61ff651f..1e731a9388418970f47d9ed9d724aaf26bee9dbd 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1794,7 +1794,7 @@ mod tests { }); } - #[gpui::test] + #[gpui::test(iterations = 100)] async fn test_editing_while_guest_opens_buffer( mut cx_a: TestAppContext, mut cx_b: TestAppContext, From 469ee554a0f1b966d5f86f3c5070895831017fb7 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 24 Jan 2022 17:13:56 -0700 Subject: [PATCH 08/12] Get most tests passing when respecting wake order for foreground tasks in Deterministic executor Co-Authored-By: Max Brunsfeld --- crates/gpui/src/executor.rs | 81 ++++++++++++++----------------------- 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 7133aa5de8328370f1ce65c142e10a16862b15b4..d7c8efe8710503b842a5724b14775933c5f3f198 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -73,7 +73,7 @@ unsafe impl Send for Task {} struct DeterministicState { rng: StdRng, seed: u64, - scheduled_from_foreground: HashMap>, + scheduled_from_foreground: HashMap>, scheduled_from_background: Vec, forbid_parking: bool, block_on_ticks: RangeInclusive, @@ -82,9 +82,9 @@ struct DeterministicState { waiting_backtrace: Option, } -enum ScheduledForeground { - MainFuture, - Runnable(Runnable), +struct ForegroundRunnable { + runnable: Runnable, + main: bool, } pub struct Deterministic { @@ -123,7 +123,12 @@ impl Deterministic { }) } - fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask { + fn spawn_from_foreground( + &self, + cx_id: usize, + future: AnyLocalFuture, + main: bool, + ) -> AnyLocalTask { let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { @@ -132,7 +137,7 @@ impl Deterministic { .scheduled_from_foreground .entry(cx_id) .or_default() - .push(ScheduledForeground::Runnable(runnable)); + .push(ForegroundRunnable { runnable, main }); unparker.unpark(); }); runnable.schedule(); @@ -151,10 +156,12 @@ impl Deterministic { task } - fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box { + fn run(&self, cx_id: usize, main_future: AnyLocalFuture) -> Box { let woken = Arc::new(AtomicBool::new(false)); + let mut main_task = self.spawn_from_foreground(cx_id, main_future, true); + loop { - if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) { + if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) { return result; } @@ -167,44 +174,20 @@ impl Deterministic { } } - fn run_until_parked(&self, cx_id: usize) { + fn run_until_parked(&self) { let woken = Arc::new(AtomicBool::new(false)); - let mut future = any_local_future(std::future::pending::<()>()); - self.run_internal(cx_id, woken, &mut future); + self.run_internal(woken, None); } fn run_internal( &self, - cx_id: usize, woken: Arc, - future: &mut AnyLocalFuture, + mut main_task: Option<&mut AnyLocalTask>, ) -> Option> { let unparker = self.parker.lock().unparker(); - let scheduled_main_future = Arc::new(AtomicBool::new(true)); - self.state - .lock() - .scheduled_from_foreground - .entry(cx_id) - .or_default() - .insert(0, ScheduledForeground::MainFuture); - - let waker = waker_fn({ - let state = self.state.clone(); - let scheduled_main_future = scheduled_main_future.clone(); - move || { - woken.store(true, SeqCst); - if !scheduled_main_future.load(SeqCst) { - scheduled_main_future.store(true, SeqCst); - state - .lock() - .scheduled_from_foreground - .entry(cx_id) - .or_default() - .push(ScheduledForeground::MainFuture); - } - - unparker.unpark(); - } + let waker = waker_fn(move || { + woken.store(true, SeqCst); + unparker.unpark(); }); let mut cx = Context::from_waker(&waker); @@ -234,25 +217,21 @@ impl Deterministic { .scheduled_from_foreground .get_mut(&cx_id_to_run) .unwrap(); - let runnable = scheduled_from_cx.remove(0); + let foreground_runnable = scheduled_from_cx.remove(0); if scheduled_from_cx.is_empty() { state.scheduled_from_foreground.remove(&cx_id_to_run); } drop(state); - match runnable { - ScheduledForeground::MainFuture => { - scheduled_main_future.store(false, SeqCst); - if let Poll::Ready(result) = future.poll(&mut cx) { + + foreground_runnable.runnable.run(); + if let Some(main_task) = main_task.as_mut() { + if foreground_runnable.main { + if let Poll::Ready(result) = main_task.poll(&mut cx) { return Some(result); } } - ScheduledForeground::Runnable(runnable) => { - runnable.run(); - } } - } else { - return None; } } } @@ -364,7 +343,7 @@ impl Foreground { let future = any_local_future(future); let any_task = match self { Self::Deterministic { cx_id, executor } => { - executor.spawn_from_foreground(*cx_id, future) + executor.spawn_from_foreground(*cx_id, future, false) } Self::Platform { dispatcher, .. } => { fn spawn_inner( @@ -448,8 +427,8 @@ impl Foreground { pub fn advance_clock(&self, duration: Duration) { match self { - Self::Deterministic { cx_id, executor } => { - executor.run_until_parked(*cx_id); + Self::Deterministic { executor, .. } => { + executor.run_until_parked(); let mut state = executor.state.lock(); state.now += duration; From afa33c958be22b732985ea9abb76b55bee6ad42b Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 24 Jan 2022 17:24:07 -0700 Subject: [PATCH 09/12] Clear shared buffers when unsharing projects Co-Authored-By: Max Brunsfeld --- crates/project/src/project.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index d6d842d80ed90c5e8128f65c0e435aae37383b0f..cb52907aea7fedfcbdfc2b1cb933e049f93c93ac 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -462,6 +462,7 @@ impl Project { rpc.send(proto::UnshareProject { project_id }).await?; this.update(&mut cx, |this, cx| { this.collaborators.clear(); + this.shared_buffers.clear(); for worktree in this.worktrees(cx).collect::>() { worktree.update(cx, |worktree, _| { worktree.as_local_mut().unwrap().unshare(); From d241ab6370caa611ee9c7f117723aed454152b27 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 24 Jan 2022 17:33:46 -0700 Subject: [PATCH 10/12] Don't store operations for remote buffers we haven't yet opened This used to be needed, but we think with our improvements to message ordering that we'll never miss operations that were applied after opening a remote buffer. Co-Authored-By: Max Brunsfeld --- crates/project/src/project.rs | 190 +++++++++++++--------------------- 1 file changed, 72 insertions(+), 118 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index cb52907aea7fedfcbdfc2b1cb933e049f93c93ac..2944eda890dcfaa24559302d699f1c69288e4aea 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -15,7 +15,7 @@ use gpui::{ use language::{ proto::{deserialize_anchor, serialize_anchor}, range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language, - LanguageRegistry, Operation, PointUtf16, ToOffset, ToPointUtf16, + LanguageRegistry, PointUtf16, ToOffset, ToPointUtf16, }; use lsp::{DiagnosticSeverity, LanguageServer}; use postage::{prelude::Stream, watch}; @@ -43,7 +43,7 @@ pub struct Project { collaborators: HashMap, subscriptions: Vec, language_servers_with_diagnostics_running: isize, - open_buffers: HashMap, + open_buffers: HashMap>, loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, @@ -51,11 +51,6 @@ pub struct Project { shared_buffers: HashMap>>, } -enum OpenBuffer { - Operations(Vec), - Loaded(WeakModelHandle), -} - enum WorktreeHandle { Strong(ModelHandle), Weak(WeakModelHandle), @@ -652,17 +647,16 @@ impl Project { let mut result = None; let worktree = self.worktree_for_id(path.worktree_id, cx)?; self.open_buffers.retain(|_, buffer| { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - result = Some(buffer); - } + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + result = Some(buffer); } - return true; } + true + } else { + false } - false }); result } @@ -673,17 +667,12 @@ impl Project { worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Result<()> { - match self.open_buffers.insert( - buffer.read(cx).remote_id() as usize, - OpenBuffer::Loaded(buffer.downgrade()), - ) { - Some(OpenBuffer::Operations(pending_ops)) => { - // buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; - } - Some(OpenBuffer::Loaded(_)) => { - return Err(anyhow!("registered the same buffer twice")); - } - None => {} + if self + .open_buffers + .insert(buffer.read(cx).remote_id() as usize, buffer.downgrade()) + .is_some() + { + return Err(anyhow!("registered the same buffer twice")); } self.assign_language_to_buffer(&buffer, worktree, cx); Ok(()) @@ -1276,62 +1265,60 @@ impl Project { let snapshot = worktree_handle.read(cx).snapshot(); let mut buffers_to_delete = Vec::new(); for (buffer_id, buffer) in &self.open_buffers { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| { - if let Some(old_file) = File::from_dyn(buffer.file()) { - if old_file.worktree != worktree_handle { - return; + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| { + if let Some(old_file) = File::from_dyn(buffer.file()) { + if old_file.worktree != worktree_handle { + return; + } + + let new_file = if let Some(entry) = old_file + .entry_id + .and_then(|entry_id| snapshot.entry_for_id(entry_id)) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), + } + } else if let Some(entry) = + snapshot.entry_for_path(old_file.path().as_ref()) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), + } + } else { + File { + is_local: true, + entry_id: None, + path: old_file.path().clone(), + mtime: old_file.mtime(), + worktree: worktree_handle.clone(), } + }; - let new_file = if let Some(entry) = old_file - .entry_id - .and_then(|entry_id| snapshot.entry_for_id(entry_id)) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - } - } else if let Some(entry) = - snapshot.entry_for_path(old_file.path().as_ref()) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - } - } else { - File { - is_local: true, - entry_id: None, - path: old_file.path().clone(), - mtime: old_file.mtime(), - worktree: worktree_handle.clone(), - } + if let Some(project_id) = self.remote_id() { + let client = self.client.clone(); + let message = proto::UpdateBufferFile { + project_id, + buffer_id: *buffer_id as u64, + file: Some(new_file.to_proto()), }; - - if let Some(project_id) = self.remote_id() { - let client = self.client.clone(); - let message = proto::UpdateBufferFile { - project_id, - buffer_id: *buffer_id as u64, - file: Some(new_file.to_proto()), - }; - cx.foreground() - .spawn(async move { client.send(message).await }) - .detach_and_log_err(cx); - } - buffer.file_updated(Box::new(new_file), cx).detach(); + cx.foreground() + .spawn(async move { client.send(message).await }) + .detach_and_log_err(cx); } - }); - } else { - buffers_to_delete.push(*buffer_id); - } + buffer.file_updated(Box::new(new_file), cx).detach(); + } + }); + } else { + buffers_to_delete.push(*buffer_id); } } @@ -1469,10 +1456,8 @@ impl Project { .replica_id; self.shared_buffers.remove(&peer_id); for (_, buffer) in &self.open_buffers { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); - } + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } } cx.notify(); @@ -1582,19 +1567,9 @@ impl Project { .into_iter() .map(|op| language::proto::deserialize_operation(op)) .collect::, _>>()?; - match self.open_buffers.get_mut(&buffer_id) { - Some(OpenBuffer::Operations(pending_ops)) => pending_ops.extend(ops), - Some(OpenBuffer::Loaded(buffer)) => { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; - } else { - self.open_buffers - .insert(buffer_id, OpenBuffer::Operations(ops)); - } - } - None => { - self.open_buffers - .insert(buffer_id, OpenBuffer::Operations(ops)); + if let Some(buffer) = self.open_buffers.get_mut(&buffer_id) { + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; } } Ok(()) @@ -1867,13 +1842,7 @@ impl Project { let buffer = self .open_buffers .get(&(payload.buffer_id as usize)) - .and_then(|buf| { - if let OpenBuffer::Loaded(buffer) = buf { - buffer.upgrade(cx) - } else { - None - } - }); + .and_then(|buffer| buffer.upgrade(cx)); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { let version = payload.version.try_into()?; @@ -1898,13 +1867,7 @@ impl Project { let buffer = self .open_buffers .get(&(payload.buffer_id as usize)) - .and_then(|buf| { - if let OpenBuffer::Loaded(buffer) = buf { - buffer.upgrade(cx) - } else { - None - } - }); + .and_then(|buffer| buffer.upgrade(cx)); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { let version = payload.version.try_into()?; @@ -2103,15 +2066,6 @@ impl> From<(WorktreeId, P)> for ProjectPath { } } -impl OpenBuffer { - fn upgrade(&self, cx: &AppContext) -> Option> { - match self { - OpenBuffer::Loaded(buffer) => buffer.upgrade(cx), - OpenBuffer::Operations(_) => None, - } - } -} - #[cfg(test)] mod tests { use super::{Event, *}; From 2b8685c1a2cc470b5d761944da6db241698e7efe Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 24 Jan 2022 14:38:24 -0800 Subject: [PATCH 11/12] Insert random delays when sending and receiving websocket messages in tests Co-Authored-By: Nathan Sobo --- crates/client/src/test.rs | 2 +- crates/gpui/src/executor.rs | 13 +++++++++++- crates/rpc/Cargo.toml | 3 ++- crates/rpc/src/conn.rs | 40 ++++++++++++++++++++++++++----------- crates/rpc/src/peer.rs | 13 +++++++----- crates/server/src/rpc.rs | 3 ++- 6 files changed, 53 insertions(+), 21 deletions(-) diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 1630a454b79296e27ec9eb1545aeb8f438b010e6..7402417196250affb845185454a657894075f542 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -94,7 +94,7 @@ impl FakeServer { Err(EstablishConnectionError::Unauthorized)? } - let (client_conn, server_conn, _) = Connection::in_memory(); + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); let (connection_id, io, incoming) = self.peer.add_connection(server_conn).await; cx.background().spawn(io).detach(); *self.incoming.lock() = Some(incoming); diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index d7c8efe8710503b842a5724b14775933c5f3f198..5e37acbc1d7eef65f1f9e3def62c35b07e19fdfd 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -5,7 +5,7 @@ use collections::HashMap; use parking_lot::Mutex; use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; -use smol::{channel, prelude::*, Executor, Timer}; +use smol::{channel, future::yield_now, prelude::*, Executor, Timer}; use std::{ any::Any, fmt::{self, Debug, Display}, @@ -528,6 +528,17 @@ impl Background { task.await; } } + + pub async fn simulate_random_delay(&self) { + match self { + Self::Deterministic { executor, .. } => { + if executor.state.lock().rng.gen_range(0..100) < 20 { + yield_now().await; + } + } + _ => panic!("this method can only be called on a deterministic executor"), + } + } } pub struct Scope<'a> { diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 4be612eec77ae902db19b04cd04dcd3b19adf527..c7f701662e99f3a8a7d418c3f587e90d83ae8ecb 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -8,7 +8,7 @@ version = "0.1.0" path = "src/rpc.rs" [features] -test-support = [] +test-support = ["gpui"] [dependencies] anyhow = "1.0" @@ -25,6 +25,7 @@ rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" zstd = "0.9" +gpui = { path = "../gpui", features = ["test-support"], optional = true } [build-dependencies] prost-build = "0.8" diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index 5ca845d13f1d489861fe076b2258a8d84bf8d615..a0db93287688d91899a482aa1e90508987eaee3c 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -34,12 +34,14 @@ impl Connection { } #[cfg(any(test, feature = "test-support"))] - pub fn in_memory() -> (Self, Self, postage::watch::Sender>) { + pub fn in_memory( + executor: std::sync::Arc, + ) -> (Self, Self, postage::watch::Sender>) { let (kill_tx, mut kill_rx) = postage::watch::channel_with(None); postage::stream::Stream::try_recv(&mut kill_rx).unwrap(); - let (a_tx, a_rx) = Self::channel(kill_rx.clone()); - let (b_tx, b_rx) = Self::channel(kill_rx); + let (a_tx, a_rx) = Self::channel(kill_rx.clone(), executor.clone()); + let (b_tx, b_rx) = Self::channel(kill_rx, executor); ( Self { tx: a_tx, rx: b_rx }, Self { tx: b_tx, rx: a_rx }, @@ -50,11 +52,12 @@ impl Connection { #[cfg(any(test, feature = "test-support"))] fn channel( kill_rx: postage::watch::Receiver>, + executor: std::sync::Arc, ) -> ( Box>, Box>>, ) { - use futures::{future, SinkExt as _}; + use futures::SinkExt as _; use io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); @@ -62,26 +65,39 @@ impl Connection { .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .with({ let kill_rx = kill_rx.clone(); + let executor = executor.clone(); move |msg| { - if kill_rx.borrow().is_none() { - future::ready(Ok(msg)) - } else { - future::ready(Err(Error::new(ErrorKind::Other, "connection killed").into())) - } + let kill_rx = kill_rx.clone(); + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + if kill_rx.borrow().is_none() { + Ok(msg) + } else { + Err(Error::new(ErrorKind::Other, "connection killed").into()) + } + }) } }); + let rx = rx.then(move |msg| { + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + msg + }) + }); let rx = KillableReceiver { kill_rx, rx }; (Box::new(tx), Box::new(rx)) } } -struct KillableReceiver { - rx: mpsc::UnboundedReceiver, +struct KillableReceiver { + rx: S, kill_rx: postage::watch::Receiver>, } -impl Stream for KillableReceiver { +impl> Stream for KillableReceiver { type Item = Result; fn poll_next( diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 867ed91ab3e4b05435784d1fbbe9de375a98ac93..dcfcd2530c31f4e1a3959d79c8c131ed68b3beb0 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -353,12 +353,14 @@ mod tests { let client1 = Peer::new(); let client2 = Peer::new(); - let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); + let (client1_to_server_conn, server_to_client_1_conn, _) = + Connection::in_memory(cx.background()); let (client1_conn_id, io_task1, client1_incoming) = client1.add_connection(client1_to_server_conn).await; let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await; - let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); + let (client2_to_server_conn, server_to_client_2_conn, _) = + Connection::in_memory(cx.background()); let (client2_conn_id, io_task3, client2_incoming) = client2.add_connection(client2_to_server_conn).await; let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await; @@ -489,7 +491,8 @@ mod tests { let server = Peer::new(); let client = Peer::new(); - let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory(); + let (client_to_server_conn, server_to_client_conn, _) = + Connection::in_memory(cx.background()); let (client_to_server_conn_id, io_task1, mut client_incoming) = client.add_connection(client_to_server_conn).await; let (server_to_client_conn_id, io_task2, mut server_incoming) = @@ -589,7 +592,7 @@ mod tests { async fn test_disconnect(cx: TestAppContext) { let executor = cx.foreground(); - let (client_conn, mut server_conn, _) = Connection::in_memory(); + let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background()); let client = Peer::new(); let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; @@ -623,7 +626,7 @@ mod tests { #[gpui::test(iterations = 10)] async fn test_io_error(cx: TestAppContext) { let executor = cx.foreground(); - let (client_conn, mut server_conn, _) = Connection::in_memory(); + let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background()); let client = Peer::new(); let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 1e731a9388418970f47d9ed9d724aaf26bee9dbd..d1660976a6f50c4af90994a48ed6ec6b62d06ef7 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -3242,7 +3242,8 @@ mod tests { "server is forbidding connections" ))) } else { - let (client_conn, server_conn, kill_conn) = Connection::in_memory(); + let (client_conn, server_conn, kill_conn) = + Connection::in_memory(cx.background()); connection_killers.lock().insert(user_id, kill_conn); cx.background() .spawn(server.handle_connection( From 0cfb9ff1ae22bb3c2900b7216607ce73e5ec32f8 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 24 Jan 2022 16:50:38 -0800 Subject: [PATCH 12/12] Add random delays in FakeFs Co-Authored-By: Nathan Sobo --- crates/project/src/fs.rs | 11 ++++++++++- crates/project/src/project.rs | 8 ++++---- crates/project/src/worktree.rs | 2 +- crates/server/src/rpc.rs | 26 +++++++++++++------------- crates/workspace/src/workspace.rs | 2 +- crates/zed/src/test.rs | 2 +- 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/crates/project/src/fs.rs b/crates/project/src/fs.rs index 895d7d4cc1a0b440a4ca92f63ebed1284f025365..a26b5ed8f72d4fa21998f32c69e5e5648b9f2636 100644 --- a/crates/project/src/fs.rs +++ b/crates/project/src/fs.rs @@ -181,11 +181,12 @@ impl FakeFsState { pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. state: futures::lock::Mutex, + executor: std::sync::Arc, } #[cfg(any(test, feature = "test-support"))] impl FakeFs { - pub fn new() -> Self { + pub fn new(executor: std::sync::Arc) -> Self { let (events_tx, _) = postage::broadcast::channel(2048); let mut entries = std::collections::BTreeMap::new(); entries.insert( @@ -201,6 +202,7 @@ impl FakeFs { }, ); Self { + executor, state: futures::lock::Mutex::new(FakeFsState { entries, next_inode: 1, @@ -330,6 +332,7 @@ impl FakeFs { #[async_trait::async_trait] impl Fs for FakeFs { async fn load(&self, path: &Path) -> Result { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; let text = state .entries @@ -340,6 +343,7 @@ impl Fs for FakeFs { } async fn save(&self, path: &Path, text: &Rope) -> Result<()> { + self.executor.simulate_random_delay().await; let mut state = self.state.lock().await; state.validate_path(path)?; if let Some(entry) = state.entries.get_mut(path) { @@ -370,10 +374,12 @@ impl Fs for FakeFs { } async fn canonicalize(&self, path: &Path) -> Result { + self.executor.simulate_random_delay().await; Ok(path.to_path_buf()) } async fn is_file(&self, path: &Path) -> bool { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; state .entries @@ -382,6 +388,7 @@ impl Fs for FakeFs { } async fn metadata(&self, path: &Path) -> Result> { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; Ok(state.entries.get(path).map(|entry| entry.metadata.clone())) } @@ -391,6 +398,7 @@ impl Fs for FakeFs { abs_path: &Path, ) -> Result>>>> { use futures::{future, stream}; + self.executor.simulate_random_delay().await; let state = self.state.lock().await; let abs_path = abs_path.to_path_buf(); Ok(Box::pin(stream::iter(state.entries.clone()).filter_map( @@ -410,6 +418,7 @@ impl Fs for FakeFs { _: Duration, ) -> Pin>>> { let state = self.state.lock().await; + self.executor.simulate_random_delay().await; let rx = state.events_tx.subscribe(); let path = path.to_path_buf(); Box::pin(futures::StreamExt::filter(rx, move |events| { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 2944eda890dcfaa24559302d699f1c69288e4aea..8cba48132cb46744013ccb8be1c783985a40277c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2439,7 +2439,7 @@ mod tests { #[gpui::test] async fn test_save_file(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/dir", json!({ @@ -2477,7 +2477,7 @@ mod tests { #[gpui::test] async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/dir", json!({ @@ -2664,7 +2664,7 @@ mod tests { #[gpui::test] async fn test_buffer_deduping(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/the-dir", json!({ @@ -2953,7 +2953,7 @@ mod tests { #[gpui::test] async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/the-dir", json!({ diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 15e4bed2bf2bafbeb6536eb56e4ce781c46190e1..11006f32baf6ec1ef06ad0688518b75c604693fe 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -2432,7 +2432,7 @@ mod tests { #[gpui::test] async fn test_traversal(cx: gpui::TestAppContext) { - let fs = FakeFs::new(); + let fs = FakeFs::new(cx.background()); fs.insert_tree( "/root", json!({ diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index d1660976a6f50c4af90994a48ed6ec6b62d06ef7..8d5adfb3c5ab9b23a402c2dbdcc760b68b66fb29 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1201,7 +1201,7 @@ mod tests { async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { let (window_b, _) = cx_b.add_window(|_| EmptyView); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 2 clients. @@ -1339,7 +1339,7 @@ mod tests { #[gpui::test] async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 2 clients. @@ -1440,7 +1440,7 @@ mod tests { mut cx_c: TestAppContext, ) { let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 3 clients. @@ -1623,7 +1623,7 @@ mod tests { async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1716,7 +1716,7 @@ mod tests { async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1801,7 +1801,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1881,7 +1881,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1956,7 +1956,7 @@ mod tests { async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -2030,7 +2030,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Set up a fake language server. let (language_server_config, mut fake_language_server) = @@ -2251,7 +2251,7 @@ mod tests { async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Set up a fake language server. let (language_server_config, mut fake_language_server) = @@ -2355,7 +2355,7 @@ mod tests { async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); fs.insert_tree( "/root-1", json!({ @@ -2516,7 +2516,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); fs.insert_tree( "/root", json!({ @@ -3042,7 +3042,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 3 clients. let mut server = TestServer::start(cx_a.foreground()).await; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 8dc98c852bd17bca7bb1f2839bf2b83e0b93c47d..86f399a86a7dfed62a524a88a8af172f461fde5d 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -490,7 +490,7 @@ pub struct WorkspaceParams { impl WorkspaceParams { #[cfg(any(test, feature = "test-support"))] pub fn test(cx: &mut MutableAppContext) -> Self { - let fs = Arc::new(project::FakeFs::new()); + let fs = Arc::new(project::FakeFs::new(cx.background().clone())); let languages = Arc::new(LanguageRegistry::new()); let http_client = client::test::FakeHttpClient::new(|_| async move { Ok(client::http::ServerResponse::new(404)) diff --git a/crates/zed/src/test.rs b/crates/zed/src/test.rs index 4f685415d087f078e1d3f9ef3c9d969b3df31439..a365fdc6f46c7632f302c8a6fdbe25919c0edc1e 100644 --- a/crates/zed/src/test.rs +++ b/crates/zed/src/test.rs @@ -40,7 +40,7 @@ pub fn test_app_state(cx: &mut MutableAppContext) -> Arc { channel_list: cx.add_model(|cx| ChannelList::new(user_store.clone(), client.clone(), cx)), client, user_store, - fs: Arc::new(FakeFs::new()), + fs: Arc::new(FakeFs::new(cx.background().clone())), path_openers: Arc::from(path_openers), build_window_options: &build_window_options, build_workspace: &build_workspace,