diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 2c31e8eef376311210ab9e3ae76d703ca903eb69..f0f6ef6146fe9efe712a8c14a9791b6143714863 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -530,10 +530,13 @@ impl Client { let cx = cx.clone(); let this = self.clone(); async move { + let mut message_id = 0_usize; while let Some(message) = incoming.next().await { let mut state = this.state.write(); - let payload_type_id = message.payload_type_id(); + message_id += 1; let type_name = message.payload_type_name(); + let payload_type_id = message.payload_type_id(); + let sender_id = message.original_sender_id().map(|id| id.0); let model = state .models_by_message_type @@ -575,8 +578,10 @@ impl Client { let client_id = this.id; log::debug!( - "rpc message received. client_id:{}, name:{}", + "rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}", client_id, + message_id, + sender_id, type_name ); cx.foreground() @@ -584,15 +589,19 @@ impl Client { match future.await { Ok(()) => { log::debug!( - "rpc message handled. client_id:{}, name:{}", + "rpc message handled. client_id:{}, message_id:{}, sender_id:{:?}, type:{}", client_id, + message_id, + sender_id, type_name ); } Err(error) => { log::error!( - "error handling message. client_id:{}, name:{}, {}", + "error handling message. client_id:{}, message_id:{}, sender_id:{:?}, type:{}, error:{:?}", client_id, + message_id, + sender_id, type_name, error ); @@ -827,7 +836,7 @@ impl Client { ) -> impl Future> { let client_id = self.id; log::debug!( - "rpc request start. client_id: {}. name:{}", + "rpc request start. client_id:{}. name:{}", client_id, T::NAME ); @@ -837,7 +846,7 @@ impl Client { async move { let response = response?.await; log::debug!( - "rpc request finish. client_id: {}. name:{}", + "rpc request finish. client_id:{}. name:{}", client_id, T::NAME ); @@ -846,7 +855,7 @@ impl Client { } fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { - log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); self.peer.respond(receipt, response) } @@ -855,7 +864,7 @@ impl Client { receipt: Receipt, error: proto::Error, ) -> Result<()> { - log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); self.peer.respond_with_error(receipt, error) } } diff --git a/crates/editor/src/editor.rs b/crates/editor/src/editor.rs index 254f2ee4150609fb9a0faeae6b02031ae78d6582..d3c6beaa9751fefd23b865345caebaf927f0a86e 100644 --- a/crates/editor/src/editor.rs +++ b/crates/editor/src/editor.rs @@ -8143,16 +8143,18 @@ mod tests { #[gpui::test] async fn test_completion(mut cx: gpui::TestAppContext) { let settings = cx.read(EditorSettings::test); - let (language_server, mut fake) = lsp::LanguageServer::fake_with_capabilities( - lsp::ServerCapabilities { - completion_provider: Some(lsp::CompletionOptions { - trigger_characters: Some(vec![".".to_string(), ":".to_string()]), + let (language_server, mut fake) = cx.update(|cx| { + lsp::LanguageServer::fake_with_capabilities( + lsp::ServerCapabilities { + completion_provider: Some(lsp::CompletionOptions { + trigger_characters: Some(vec![".".to_string(), ":".to_string()]), + ..Default::default() + }), ..Default::default() - }), - ..Default::default() - }, - cx.background(), - ); + }, + cx, + ) + }); let text = " one @@ -8318,7 +8320,7 @@ mod tests { position: Point, completions: Vec<(Range, &'static str)>, ) { - fake.handle_request::(move |params| { + fake.handle_request::(move |params, _| { assert_eq!( params.text_document_position.text_document.uri, lsp::Url::from_file_path(path).unwrap() @@ -8352,7 +8354,7 @@ mod tests { fake: &mut FakeLanguageServer, edit: Option<(Range, &'static str)>, ) { - fake.handle_request::(move |_| { + fake.handle_request::(move |_, _| { lsp::CompletionItem { additional_text_edits: edit.clone().map(|(range, new_text)| { vec![lsp::TextEdit::new( diff --git a/crates/language/src/language.rs b/crates/language/src/language.rs index 57a4fe001a7228767e29f261b064451bcc6f8ceb..31edba7131c61776ae9bc4fac7bec3aa00211a65 100644 --- a/crates/language/src/language.rs +++ b/crates/language/src/language.rs @@ -13,7 +13,7 @@ use futures::{ future::{BoxFuture, Shared}, FutureExt, TryFutureExt, }; -use gpui::{AppContext, Task}; +use gpui::{MutableAppContext, Task}; use highlight_map::HighlightMap; use lazy_static::lazy_static; use parking_lot::{Mutex, RwLock}; @@ -225,7 +225,7 @@ impl LanguageRegistry { language: &Arc, root_path: Arc, http_client: Arc, - cx: &AppContext, + cx: &mut MutableAppContext, ) -> Option>>> { #[cfg(any(test, feature = "test-support"))] if let Some(config) = &language.config.language_server { @@ -234,7 +234,7 @@ impl LanguageRegistry { let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities( fake_config.capabilities.clone(), - cx.background().clone(), + cx, ); if let Some(initalizer) = &fake_config.initializer { diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index 9fd5d693ff65de0de342d6c2b3ac86d70d5d188c..eac6c6b5d6be0741ae750768ec315d16790a9a5f 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -554,7 +554,7 @@ fn test_autoindent_adjusts_lines_when_only_text_changes(cx: &mut MutableAppConte #[gpui::test] async fn test_diagnostics(mut cx: gpui::TestAppContext) { - let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background()); + let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake); let mut rust_lang = rust_lang(); rust_lang.config.language_server = Some(LanguageServerConfig { disk_based_diagnostic_sources: HashSet::from_iter(["disk".to_string()]), @@ -837,7 +837,7 @@ async fn test_diagnostics(mut cx: gpui::TestAppContext) { #[gpui::test] async fn test_edits_from_lsp_with_past_version(mut cx: gpui::TestAppContext) { - let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background()); + let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake); let text = " fn a() { diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index 93df14f89dd9b08c849b074d6ddbd00f068de5b0..c563c233118fb106299c7d57606abf78e8d3e8ac 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -483,36 +483,47 @@ impl Drop for Subscription { #[cfg(any(test, feature = "test-support"))] pub struct FakeLanguageServer { - handlers: - Arc Vec>>>>, + handlers: Arc< + Mutex< + HashMap< + &'static str, + Box Vec>, + >, + >, + >, outgoing_tx: futures::channel::mpsc::UnboundedSender>, incoming_rx: futures::channel::mpsc::UnboundedReceiver>, } #[cfg(any(test, feature = "test-support"))] impl LanguageServer { - pub fn fake(executor: Arc) -> (Arc, FakeLanguageServer) { - Self::fake_with_capabilities(Default::default(), executor) + pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc, FakeLanguageServer) { + Self::fake_with_capabilities(Default::default(), cx) } pub fn fake_with_capabilities( capabilities: ServerCapabilities, - executor: Arc, + cx: &mut gpui::MutableAppContext, ) -> (Arc, FakeLanguageServer) { let (stdin_writer, stdin_reader) = async_pipe::pipe(); let (stdout_writer, stdout_reader) = async_pipe::pipe(); - let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer); + let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx); fake.handle_request::({ let capabilities = capabilities.clone(); - move |_| InitializeResult { + move |_, _| InitializeResult { capabilities: capabilities.clone(), ..Default::default() } }); - let server = - Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap(); + let server = Self::new_internal( + stdin_writer, + stdout_reader, + Path::new("/"), + cx.background().clone(), + ) + .unwrap(); (server, fake) } @@ -521,9 +532,9 @@ impl LanguageServer { #[cfg(any(test, feature = "test-support"))] impl FakeLanguageServer { fn new( - background: Arc, stdin: async_pipe::PipeReader, stdout: async_pipe::PipeWriter, + cx: &mut gpui::MutableAppContext, ) -> Self { use futures::StreamExt as _; @@ -537,43 +548,42 @@ impl FakeLanguageServer { // Receive incoming messages let handlers = this.handlers.clone(); - let executor = background.clone(); - background - .spawn(async move { - let mut buffer = Vec::new(); - let mut stdin = smol::io::BufReader::new(stdin); - while Self::receive(&mut stdin, &mut buffer).await.is_ok() { - executor.simulate_random_delay().await; - if let Ok(request) = serde_json::from_slice::(&buffer) { - assert_eq!(request.jsonrpc, JSON_RPC_VERSION); - - if let Some(handler) = handlers.lock().get_mut(request.method) { - let response = handler(request.id, request.params.get().as_bytes()); - log::debug!("handled lsp request. method:{}", request.method); - outgoing_tx.unbounded_send(response)?; - } else { - log::debug!("unhandled lsp request. method:{}", request.method); - outgoing_tx.unbounded_send( - serde_json::to_vec(&AnyResponse { - id: request.id, - error: Some(Error { - message: "no handler".to_string(), - }), - result: None, - }) - .unwrap(), - )?; - } + cx.spawn(|cx| async move { + let mut buffer = Vec::new(); + let mut stdin = smol::io::BufReader::new(stdin); + while Self::receive(&mut stdin, &mut buffer).await.is_ok() { + cx.background().simulate_random_delay().await; + if let Ok(request) = serde_json::from_slice::(&buffer) { + assert_eq!(request.jsonrpc, JSON_RPC_VERSION); + + if let Some(handler) = handlers.lock().get_mut(request.method) { + let response = + handler(request.id, request.params.get().as_bytes(), cx.clone()); + log::debug!("handled lsp request. method:{}", request.method); + outgoing_tx.unbounded_send(response)?; } else { - incoming_tx.unbounded_send(buffer.clone())?; + log::debug!("unhandled lsp request. method:{}", request.method); + outgoing_tx.unbounded_send( + serde_json::to_vec(&AnyResponse { + id: request.id, + error: Some(Error { + message: "no handler".to_string(), + }), + result: None, + }) + .unwrap(), + )?; } + } else { + incoming_tx.unbounded_send(buffer.clone())?; } - Ok::<_, anyhow::Error>(()) - }) - .detach(); + } + Ok::<_, anyhow::Error>(()) + }) + .detach(); // Send outgoing messages - background + cx.background() .spawn(async move { let mut stdout = smol::io::BufWriter::new(stdout); while let Some(notification) = outgoing_rx.next().await { @@ -618,13 +628,13 @@ impl FakeLanguageServer { ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, - F: 'static + Send + Sync + FnMut(T::Params) -> T::Result, + F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result, { let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded(); self.handlers.lock().insert( T::METHOD, - Box::new(move |id, params| { - let result = handler(serde_json::from_slice::(params).unwrap()); + Box::new(move |id, params, cx| { + let result = handler(serde_json::from_slice::(params).unwrap(), cx); let result = serde_json::to_string(&result).unwrap(); let result = serde_json::from_str::<&RawValue>(&result).unwrap(); let response = AnyResponse { @@ -709,8 +719,8 @@ mod tests { } #[gpui::test] - async fn test_fake(cx: TestAppContext) { - let (server, mut fake) = LanguageServer::fake(cx.background()); + async fn test_fake(mut cx: TestAppContext) { + let (server, mut fake) = cx.update(LanguageServer::fake); let (message_tx, message_rx) = channel::unbounded(); let (diagnostics_tx, diagnostics_rx) = channel::unbounded(); @@ -762,7 +772,7 @@ mod tests { "file://b/c" ); - fake.handle_request::(|_| ()); + fake.handle_request::(|_, _| ()); drop(server); fake.receive_notification::().await; diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3b502fc8fafc5accfc977eee572c853a68701b48..b091fe0bc390bf95f36d51596e81696790c01fa0 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -1,4 +1,4 @@ -use crate::{DocumentHighlight, Location, Project, ProjectTransaction}; +use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use client::{proto, PeerId}; @@ -48,6 +48,7 @@ pub(crate) trait LspCommand: 'static + Sized { message: ::Response, project: ModelHandle, buffer: ModelHandle, + request_handle: BufferRequestHandle, cx: AsyncAppContext, ) -> Result; fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64; @@ -161,6 +162,7 @@ impl LspCommand for PrepareRename { message: proto::PrepareRenameResponse, _: ModelHandle, buffer: ModelHandle, + _: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result>> { if message.can_rename { @@ -277,6 +279,7 @@ impl LspCommand for PerformRename { message: proto::PerformRenameResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result { let message = message @@ -284,7 +287,12 @@ impl LspCommand for PerformRename { .ok_or_else(|| anyhow!("missing transaction"))?; project .update(&mut cx, |project, cx| { - project.deserialize_project_transaction(message, self.push_to_history, cx) + project.deserialize_project_transaction( + message, + self.push_to_history, + request_handle, + cx, + ) }) .await } @@ -427,13 +435,16 @@ impl LspCommand for GetDefinition { message: proto::GetDefinitionResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -575,13 +586,16 @@ impl LspCommand for GetReferences { message: proto::GetReferencesResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let target_buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -706,6 +720,7 @@ impl LspCommand for GetDocumentHighlights { message: proto::GetDocumentHighlightsResponse, _: ModelHandle, _: ModelHandle, + _: BufferRequestHandle, _: AsyncAppContext, ) -> Result> { Ok(message diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 4e67b11a59f5dec31c266a0b0527dae128a02d71..236b02cd6c97ef0e940fdf88e4963b32806e8174 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -25,11 +25,13 @@ use rand::prelude::*; use sha2::{Digest, Sha256}; use smol::block_on; use std::{ + cell::RefCell, convert::TryInto, hash::Hash, mem, ops::Range, path::{Component, Path, PathBuf}, + rc::Rc, sync::{atomic::AtomicBool, Arc}, time::Instant, }; @@ -52,16 +54,23 @@ pub struct Project { collaborators: HashMap, subscriptions: Vec, language_servers_with_diagnostics_running: isize, - open_buffers: HashMap, opened_buffer: broadcast::Sender<()>, loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, >, + buffers_state: Rc>, shared_buffers: HashMap>>, nonce: u128, } +#[derive(Default)] +struct ProjectBuffers { + buffer_request_count: usize, + preserved_buffers: Vec>, + open_buffers: HashMap, +} + enum OpenBuffer { Loaded(WeakModelHandle), Loading(Vec), @@ -142,6 +151,8 @@ pub struct Symbol { pub signature: [u8; 32], } +pub struct BufferRequestHandle(Rc>); + #[derive(Default)] pub struct ProjectTransaction(pub HashMap, language::Transaction>); @@ -169,7 +180,7 @@ impl DiagnosticSummary { this } - pub fn to_proto(&self, path: Arc) -> proto::DiagnosticSummary { + pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary { proto::DiagnosticSummary { path: path.to_string_lossy().to_string(), error_count: self.error_count as u32, @@ -195,7 +206,7 @@ impl Project { client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated); client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating); client.add_entity_message_handler(Self::handle_remove_collaborator); - client.add_entity_message_handler(Self::handle_share_worktree); + client.add_entity_message_handler(Self::handle_register_worktree); client.add_entity_message_handler(Self::handle_unregister_worktree); client.add_entity_message_handler(Self::handle_unshare_project); client.add_entity_message_handler(Self::handle_update_buffer_file); @@ -270,7 +281,7 @@ impl Project { Self { worktrees: Default::default(), collaborators: Default::default(), - open_buffers: Default::default(), + buffers_state: Default::default(), loading_buffers: Default::default(), shared_buffers: Default::default(), client_state: ProjectClientState::Local { @@ -323,7 +334,6 @@ impl Project { let this = cx.add_model(|cx| { let mut this = Self { worktrees: Vec::new(), - open_buffers: Default::default(), loading_buffers: Default::default(), opened_buffer: broadcast::channel(1).0, shared_buffers: Default::default(), @@ -342,6 +352,7 @@ impl Project { language_servers_with_diagnostics_running: 0, language_servers: Default::default(), started_language_servers: Default::default(), + buffers_state: Default::default(), nonce: StdRng::from_entropy().gen(), }; for worktree in worktrees { @@ -390,7 +401,9 @@ impl Project { #[cfg(any(test, feature = "test-support"))] pub fn has_buffered_operations(&self) -> bool { - self.open_buffers + self.buffers_state + .borrow() + .open_buffers .values() .any(|buffer| matches!(buffer, OpenBuffer::Loading(_))) } @@ -621,11 +634,6 @@ impl Project { *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { // Record the fact that the buffer is no longer loading. this.loading_buffers.remove(&project_path); - if this.loading_buffers.is_empty() { - this.open_buffers - .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_))) - } - let buffer = load_result.map_err(Arc::new)?; Ok(buffer) })); @@ -682,6 +690,7 @@ impl Project { let remote_worktree_id = worktree.read(cx).id(); let path = path.clone(); let path_string = path.to_string_lossy().to_string(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let response = rpc .request(proto::OpenBuffer { @@ -691,8 +700,11 @@ impl Project { }) .await?; let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } @@ -733,6 +745,10 @@ impl Project { }) } + fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle { + BufferRequestHandle::new(self.buffers_state.clone(), cx) + } + pub fn save_buffer_as( &self, buffer: ModelHandle, @@ -761,40 +777,47 @@ impl Project { pub fn has_open_buffer(&self, path: impl Into, cx: &AppContext) -> bool { let path = path.into(); if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) { - self.open_buffers.iter().any(|(_, buffer)| { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - return true; + self.buffers_state + .borrow() + .open_buffers + .iter() + .any(|(_, buffer)| { + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + return true; + } } } - } - false - }) + false + }) } else { false } } - fn get_open_buffer( + pub fn get_open_buffer( &mut self, path: &ProjectPath, cx: &mut ModelContext, ) -> Option> { let mut result = None; let worktree = self.worktree_for_id(path.worktree_id, cx)?; - self.open_buffers.retain(|_, buffer| { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - result = Some(buffer); + self.buffers_state + .borrow_mut() + .open_buffers + .retain(|_, buffer| { + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + result = Some(buffer); + } } + true + } else { + false } - true - } else { - false - } - }); + }); result } @@ -804,15 +827,25 @@ impl Project { worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Result<()> { - match self.open_buffers.insert( - buffer.read(cx).remote_id(), - OpenBuffer::Loaded(buffer.downgrade()), - ) { + let remote_id = buffer.read(cx).remote_id(); + match self + .buffers_state + .borrow_mut() + .open_buffers + .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade())) + { None => {} Some(OpenBuffer::Loading(operations)) => { buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))? } - Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?, + Some(OpenBuffer::Loaded(existing_handle)) => { + if existing_handle.upgrade(cx).is_some() { + Err(anyhow!( + "already registered buffer with remote id {}", + remote_id + ))? + } + } } self.assign_language_to_buffer(&buffer, worktree, cx); Ok(()) @@ -1132,7 +1165,7 @@ impl Project { path: relative_path.into(), }; - for buffer in self.open_buffers.values() { + for buffer in self.buffers_state.borrow().open_buffers.values() { if let Some(buffer) = buffer.upgrade(cx) { if buffer .read(cx) @@ -1195,6 +1228,7 @@ impl Project { let remote_buffers = self.remote_id().zip(remote_buffers); let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); @@ -1213,7 +1247,12 @@ impl Project { .ok_or_else(|| anyhow!("missing transaction"))?; project_transaction = this .update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await?; } @@ -1430,6 +1469,7 @@ impl Project { cx, ) } else if let Some(project_id) = self.remote_id() { + let request_handle = self.start_buffer_request(cx); let request = self.client.request(proto::OpenBufferForSymbol { project_id, symbol: Some(serialize_symbol(symbol)), @@ -1437,8 +1477,10 @@ impl Project { cx.spawn(|this, mut cx| async move { let response = request.await?; let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } else { Task::ready(Err(anyhow!("project does not have a remote id"))) @@ -1817,6 +1859,7 @@ impl Project { }) } else if let Some(project_id) = self.remote_id() { let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let request = proto::ApplyCodeAction { project_id, buffer_id: buffer_handle.read(cx).remote_id(), @@ -1829,7 +1872,12 @@ impl Project { .transaction .ok_or_else(|| anyhow!("missing transaction"))?; this.update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await }) @@ -2020,11 +2068,12 @@ impl Project { } } else if let Some(project_id) = self.remote_id() { let rpc = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let message = request.to_proto(project_id, buffer); return cx.spawn(|this, cx| async move { let response = rpc.request(message).await?; request - .response_from_proto(response, this, buffer_handle, cx) + .response_from_proto(response, this, buffer_handle, request_handle, cx) .await }); } @@ -2047,7 +2096,7 @@ impl Project { } } - fn find_local_worktree( + pub fn find_local_worktree( &self, abs_path: &Path, cx: &AppContext, @@ -2152,7 +2201,7 @@ impl Project { ) { let snapshot = worktree_handle.read(cx).snapshot(); let mut buffers_to_delete = Vec::new(); - for (buffer_id, buffer) in &self.open_buffers { + for (buffer_id, buffer) in &self.buffers_state.borrow().open_buffers { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| { if let Some(old_file) = File::from_dyn(buffer.file()) { @@ -2209,7 +2258,10 @@ impl Project { } for buffer_id in buffers_to_delete { - self.open_buffers.remove(&buffer_id); + self.buffers_state + .borrow_mut() + .open_buffers + .remove(&buffer_id); } } @@ -2337,7 +2389,7 @@ impl Project { .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? .replica_id; this.shared_buffers.remove(&peer_id); - for (_, buffer) in &this.open_buffers { + for (_, buffer) in &this.buffers_state.borrow().open_buffers { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } @@ -2347,19 +2399,22 @@ impl Project { }) } - async fn handle_share_worktree( + async fn handle_register_worktree( this: ModelHandle, - envelope: TypedEnvelope, + envelope: TypedEnvelope, client: Arc, mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?; let replica_id = this.replica_id(); - let worktree = envelope - .payload - .worktree - .ok_or_else(|| anyhow!("invalid worktree"))?; + let worktree = proto::Worktree { + id: envelope.payload.worktree_id, + root_name: envelope.payload.root_name, + entries: Default::default(), + diagnostic_summaries: Default::default(), + weak: envelope.payload.weak, + }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); this.add_worktree(&worktree, cx); @@ -2461,17 +2516,21 @@ impl Project { .map(|op| language::proto::deserialize_operation(op)) .collect::, _>>()?; let is_remote = this.is_remote(); - match this.open_buffers.entry(buffer_id) { + let mut buffers_state = this.buffers_state.borrow_mut(); + let buffer_request_count = buffers_state.buffer_request_count; + match buffers_state.open_buffers.entry(buffer_id) { hash_map::Entry::Occupied(mut e) => match e.get_mut() { OpenBuffer::Loaded(buffer) => { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + } else if is_remote && buffer_request_count > 0 { + e.insert(OpenBuffer::Loading(ops)); } } OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops), }, hash_map::Entry::Vacant(e) => { - if is_remote && this.loading_buffers.len() > 0 { + if is_remote && buffer_request_count > 0 { e.insert(OpenBuffer::Loading(ops)); } } @@ -2495,6 +2554,8 @@ impl Project { .ok_or_else(|| anyhow!("no such worktree"))?; let file = File::from_proto(file, worktree.clone(), cx)?; let buffer = this + .buffers_state + .borrow_mut() .open_buffers .get_mut(&buffer_id) .and_then(|b| b.upgrade(cx)) @@ -2861,17 +2922,21 @@ impl Project { &mut self, message: proto::ProjectTransaction, push_to_history: bool, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task> { cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { let buffer = this - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); } + for (buffer, transaction) in &project_transaction.0 { buffer .update(&mut cx, |buffer, _| { @@ -2914,6 +2979,7 @@ impl Project { fn deserialize_buffer( &mut self, buffer: proto::Buffer, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task>> { let replica_id = self.replica_id(); @@ -2925,7 +2991,9 @@ impl Project { proto::buffer::Variant::Id(id) => { let buffer = loop { let buffer = this.read_with(&cx, |this, cx| { - this.open_buffers + this.buffers_state + .borrow() + .open_buffers .get(&id) .and_then(|buffer| buffer.upgrade(cx)) }); @@ -2960,6 +3028,8 @@ impl Project { let buffer = cx.add_model(|cx| { Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap() }); + + request_handle.preserve_buffer(buffer.clone()); this.update(&mut cx, |this, cx| { this.register_buffer(&buffer, buffer_worktree.as_ref(), cx) })?; @@ -3032,6 +3102,8 @@ impl Project { this.update(&mut cx, |this, cx| { let buffer = this + .buffers_state + .borrow() .open_buffers .get(&envelope.payload.buffer_id) .and_then(|buffer| buffer.upgrade(cx)); @@ -3058,6 +3130,8 @@ impl Project { .into(); this.update(&mut cx, |this, cx| { let buffer = this + .buffers_state + .borrow() .open_buffers .get(&payload.buffer_id) .and_then(|buffer| buffer.upgrade(cx)); @@ -3108,6 +3182,48 @@ impl Project { } } +impl BufferRequestHandle { + fn new(state: Rc>, cx: &AppContext) -> Self { + { + let state = &mut *state.borrow_mut(); + state.buffer_request_count += 1; + if state.buffer_request_count == 1 { + state.preserved_buffers.extend( + state + .open_buffers + .values() + .filter_map(|buffer| buffer.upgrade(cx)), + ) + } + } + Self(state) + } + + fn preserve_buffer(&self, buffer: ModelHandle) { + self.0.borrow_mut().preserved_buffers.push(buffer); + } +} + +impl Clone for BufferRequestHandle { + fn clone(&self) -> Self { + self.0.borrow_mut().buffer_request_count += 1; + Self(self.0.clone()) + } +} + +impl Drop for BufferRequestHandle { + fn drop(&mut self) { + let mut state = self.0.borrow_mut(); + state.buffer_request_count -= 1; + if state.buffer_request_count == 0 { + state.preserved_buffers.clear(); + state + .open_buffers + .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_))) + } + } +} + impl WorktreeHandle { pub fn upgrade(&self, cx: &AppContext) -> Option> { match self { @@ -3612,7 +3728,7 @@ mod tests { .unwrap(); let mut fake_server = fake_servers.next().await.unwrap(); - fake_server.handle_request::(move |params| { + fake_server.handle_request::(move |params, _| { let params = params.text_document_position_params; assert_eq!( params.text_document.uri.to_file_path().unwrap(), @@ -3885,7 +4001,6 @@ mod tests { &initial_snapshot, 1, 1, - 0, true, ); remote @@ -4504,7 +4619,7 @@ mod tests { project.prepare_rename(buffer.clone(), 7, cx) }); fake_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs"); assert_eq!(params.position, lsp::Position::new(0, 7)); Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( @@ -4523,7 +4638,7 @@ mod tests { project.perform_rename(buffer.clone(), 7, "THREE".to_string(), true, cx) }); fake_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///dir/one.rs" diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 84170115890e747dd270709d29848d6ba1663694..795ca23c8f28cd732858b52c82d1ef2ac9615ac0 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,7 +7,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; use client::{proto, Client, TypedEnvelope}; use clock::ReplicaId; -use collections::{HashMap, VecDeque}; +use collections::HashMap; use futures::{ channel::mpsc::{self, UnboundedSender}, Stream, StreamExt, @@ -43,7 +43,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap}; -use util::ResultExt; +use util::{ResultExt, TryFutureExt}; lazy_static! { static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore"); @@ -84,8 +84,6 @@ pub struct RemoteWorktree { queued_operations: Vec<(u64, Operation)>, diagnostic_summaries: TreeMap, weak: bool, - next_update_id: u64, - pending_updates: VecDeque, } #[derive(Clone)] @@ -138,7 +136,7 @@ enum Registration { struct ShareState { project_id: u64, snapshots_tx: Sender, - _maintain_remote_snapshot: Option>, + _maintain_remote_snapshot: Option>>, } #[derive(Default, Deserialize)] @@ -239,8 +237,6 @@ impl Worktree { }), ), weak, - next_update_id: worktree.next_update_id, - pending_updates: Default::default(), }) }); @@ -739,6 +735,7 @@ impl LocalWorktree { worktree_id: self.id().to_proto(), root_name: self.root_name().to_string(), authorized_logins: self.authorized_logins(), + weak: self.weak, }; cx.spawn(|this, mut cx| async move { let response = client.request(register_message).await; @@ -762,68 +759,75 @@ impl LocalWorktree { &mut self, project_id: u64, cx: &mut ModelContext, - ) -> Task> { + ) -> impl Future> { + let (mut share_tx, mut share_rx) = oneshot::channel(); if self.share.is_some() { - return Task::ready(Ok(())); - } + let _ = share_tx.try_send(Ok(())); + } else { + let snapshot = self.snapshot(); + let rpc = self.client.clone(); + let worktree_id = cx.model_id() as u64; + let (snapshots_to_send_tx, snapshots_to_send_rx) = + smol::channel::unbounded::(); + let maintain_remote_snapshot = cx.background().spawn({ + let rpc = rpc.clone(); + let snapshot = snapshot.clone(); + let diagnostic_summaries = self.diagnostic_summaries.clone(); + async move { + if let Err(error) = rpc + .request(proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .filter(|e| !e.is_ignored) + .map(Into::into) + .collect(), + removed_entries: Default::default(), + }) + .await + { + let _ = share_tx.try_send(Err(error)); + return Err(anyhow!("failed to send initial update worktree")); + } else { + let _ = share_tx.try_send(Ok(())); + } - let snapshot = self.snapshot(); - let rpc = self.client.clone(); - let worktree_id = cx.model_id() as u64; - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); - let (mut share_tx, mut share_rx) = oneshot::channel(); - let maintain_remote_snapshot = cx.background().spawn({ - let rpc = rpc.clone(); - let snapshot = snapshot.clone(); - let diagnostic_summaries = self.diagnostic_summaries.clone(); - let weak = self.weak; - async move { - if let Err(error) = rpc - .request(proto::ShareWorktree { - project_id, - worktree: Some(snapshot.to_proto(&diagnostic_summaries, weak)), - }) - .await - { - let _ = share_tx.try_send(Err(error)); - return; - } else { - let _ = share_tx.try_send(Ok(())); - } + for (path, summary) in diagnostic_summaries.iter() { + rpc.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id, + summary: Some(summary.to_proto(&path.0)), + })?; + } - let mut update_id = 0; - let mut prev_snapshot = snapshot; - while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = snapshot.build_update( - &prev_snapshot, - project_id, - worktree_id, - update_id, - false, - ); - match rpc.request(message).await { - Ok(_) => { - prev_snapshot = snapshot; - update_id += 1; - } - Err(err) => log::error!("error sending snapshot diff {}", err), + let mut prev_snapshot = snapshot; + while let Ok(snapshot) = snapshots_to_send_rx.recv().await { + let message = + snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); + rpc.request(message).await?; + prev_snapshot = snapshot; } + + Ok::<_, anyhow::Error>(()) } - } - }); - self.share = Some(ShareState { - project_id, - snapshots_tx: snapshots_to_send_tx, - _maintain_remote_snapshot: Some(maintain_remote_snapshot), - }); + .log_err() + }); + self.share = Some(ShareState { + project_id, + snapshots_tx: snapshots_to_send_tx, + _maintain_remote_snapshot: Some(maintain_remote_snapshot), + }); + } - cx.foreground().spawn(async move { - match share_rx.next().await { - Some(result) => result, - None => Err(anyhow!("unshared before sharing completed")), - } - }) + async move { + share_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("share ended"))) + } } pub fn unshare(&mut self) { @@ -844,38 +848,13 @@ impl RemoteWorktree { &mut self, envelope: TypedEnvelope, ) -> Result<()> { - let update = envelope.payload; - if update.id > self.next_update_id { - let ix = match self - .pending_updates - .binary_search_by_key(&update.id, |pending| pending.id) - { - Ok(ix) | Err(ix) => ix, - }; - self.pending_updates.insert(ix, update); - } else { - let tx = self.updates_tx.clone(); - self.next_update_id += 1; - tx.unbounded_send(update) - .expect("consumer runs to completion"); - while let Some(update) = self.pending_updates.front() { - if update.id == self.next_update_id { - self.next_update_id += 1; - tx.unbounded_send(self.pending_updates.pop_front().unwrap()) - .expect("consumer runs to completion"); - } else { - break; - } - } - } + self.updates_tx + .unbounded_send(envelope.payload) + .expect("consumer runs to completion"); Ok(()) } - pub fn has_pending_updates(&self) -> bool { - !self.pending_updates.is_empty() - } - pub fn update_diagnostic_summary( &mut self, path: Arc, @@ -1038,6 +1017,7 @@ impl Snapshot { } impl LocalSnapshot { + #[cfg(test)] pub(crate) fn to_proto( &self, diagnostic_summaries: &TreeMap, @@ -1055,10 +1035,9 @@ impl LocalSnapshot { .collect(), diagnostic_summaries: diagnostic_summaries .iter() - .map(|(path, summary)| summary.to_proto(path.0.clone())) + .map(|(path, summary)| summary.to_proto(&path.0)) .collect(), weak, - next_update_id: 0, } } @@ -1067,7 +1046,6 @@ impl LocalSnapshot { other: &Self, project_id: u64, worktree_id: u64, - update_id: u64, include_ignored: bool, ) -> proto::UpdateWorktree { let mut updated_entries = Vec::new(); @@ -1120,7 +1098,6 @@ impl LocalSnapshot { } proto::UpdateWorktree { - id: update_id as u64, project_id, worktree_id, root_name: self.root_name().to_string(), @@ -2461,7 +2438,7 @@ mod tests { fmt::Write, time::{SystemTime, UNIX_EPOCH}, }; - use util::{post_inc, test::temp_tree}; + use util::test::temp_tree; #[gpui::test] async fn test_traversal(cx: gpui::TestAppContext) { @@ -2646,7 +2623,6 @@ mod tests { new_scanner.snapshot().to_vec(true) ); - let mut update_id = 0; for mut prev_snapshot in snapshots { let include_ignored = rng.gen::(); if !include_ignored { @@ -2667,13 +2643,9 @@ mod tests { prev_snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - let update = scanner.snapshot().build_update( - &prev_snapshot, - 0, - 0, - post_inc(&mut update_id), - include_ignored, - ); + let update = scanner + .snapshot() + .build_update(&prev_snapshot, 0, 0, include_ignored); prev_snapshot.apply_remote_update(update).unwrap(); assert_eq!( prev_snapshot.to_vec(true), diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8e83b099631617462983560d08ffe49a169408b9..ebdb39942ddfa4968ba5ffcaa4acf8bd8a794140 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -34,7 +34,6 @@ message Envelope { RegisterWorktree register_worktree = 28; UnregisterWorktree unregister_worktree = 29; - ShareWorktree share_worktree = 30; UpdateWorktree update_worktree = 31; UpdateDiagnosticSummary update_diagnostic_summary = 32; DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33; @@ -132,6 +131,7 @@ message RegisterWorktree { uint64 worktree_id = 2; string root_name = 3; repeated string authorized_logins = 4; + bool weak = 5; } message UnregisterWorktree { @@ -139,18 +139,12 @@ message UnregisterWorktree { uint64 worktree_id = 2; } -message ShareWorktree { - uint64 project_id = 1; - Worktree worktree = 2; -} - message UpdateWorktree { - uint64 id = 1; - uint64 project_id = 2; - uint64 worktree_id = 3; - string root_name = 4; - repeated Entry updated_entries = 5; - repeated uint64 removed_entries = 6; + uint64 project_id = 1; + uint64 worktree_id = 2; + string root_name = 3; + repeated Entry updated_entries = 4; + repeated uint64 removed_entries = 5; } message AddProjectCollaborator { @@ -494,7 +488,6 @@ message Worktree { repeated Entry entries = 3; repeated DiagnosticSummary diagnostic_summaries = 4; bool weak = 5; - uint64 next_update_id = 6; } message File { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index b6d5c4b43815c0b053f3ff142d8b2a563655d3ce..5ec46eb353a59652eff919b9f8fce9fdb11aebb3 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -37,6 +37,7 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync { fn as_any(&self) -> &dyn Any; fn into_any(self: Box) -> Box; fn is_background(&self) -> bool; + fn original_sender_id(&self) -> Option; } pub enum MessagePriority { @@ -64,6 +65,10 @@ impl AnyTypedEnvelope for TypedEnvelope { fn is_background(&self) -> bool { matches!(T::PRIORITY, MessagePriority::Background) } + + fn original_sender_id(&self) -> Option { + self.original_sender_id + } } macro_rules! messages { @@ -157,8 +162,8 @@ messages!( (GetCompletionsResponse, Foreground), (GetDefinition, Foreground), (GetDefinitionResponse, Foreground), - (GetDocumentHighlights, Foreground), - (GetDocumentHighlightsResponse, Foreground), + (GetDocumentHighlights, Background), + (GetDocumentHighlightsResponse, Background), (GetReferences, Foreground), (GetReferencesResponse, Foreground), (GetProjectSymbols, Background), @@ -188,7 +193,6 @@ messages!( (SendChannelMessage, Foreground), (SendChannelMessageResponse, Foreground), (ShareProject, Foreground), - (ShareWorktree, Foreground), (Test, Foreground), (UnregisterProject, Foreground), (UnregisterWorktree, Foreground), @@ -228,7 +232,6 @@ request_messages!( (SaveBuffer, BufferSaved), (SendChannelMessage, SendChannelMessageResponse), (ShareProject, Ack), - (ShareWorktree, Ack), (Test, Test), (UpdateBuffer, Ack), (UpdateWorktree, Ack), @@ -259,12 +262,12 @@ entity_messages!( PrepareRename, RemoveProjectCollaborator, SaveBuffer, - ShareWorktree, UnregisterWorktree, UnshareProject, UpdateBuffer, UpdateBufferFile, UpdateDiagnosticSummary, + RegisterWorktree, UpdateWorktree, ); diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index a767676fad553b4c5d42ebc439553afc2b6f4e2a..593ad32bcdad9390c4e22110347439650f4e3ccf 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -16,7 +16,7 @@ use rpc::{ Connection, ConnectionId, Peer, TypedEnvelope, }; use sha1::{Digest as _, Sha1}; -use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant}; +use std::{any::TypeId, future::Future, sync::Arc, time::Instant}; use store::{Store, Worktree}; use surf::StatusCode; use tide::log; @@ -73,7 +73,6 @@ impl Server { .add_message_handler(Server::leave_project) .add_request_handler(Server::register_worktree) .add_message_handler(Server::unregister_worktree) - .add_request_handler(Server::share_worktree) .add_request_handler(Server::update_worktree) .add_message_handler(Server::update_diagnostic_summary) .add_message_handler(Server::disk_based_diagnostics_updating) @@ -335,22 +334,21 @@ impl Server { replica_id: 0, user_id: joined.project.host_user_id.to_proto(), }); - let worktrees = joined - .project + let worktrees = share .worktrees .iter() - .filter_map(|(id, worktree)| { - worktree.share.as_ref().map(|share| proto::Worktree { + .filter_map(|(id, shared_worktree)| { + let worktree = joined.project.worktrees.get(&id)?; + Some(proto::Worktree { id: *id, root_name: worktree.root_name.clone(), - entries: share.entries.values().cloned().collect(), - diagnostic_summaries: share + entries: shared_worktree.entries.values().cloned().collect(), + diagnostic_summaries: shared_worktree .diagnostic_summaries .values() .cloned() .collect(), weak: worktree.weak, - next_update_id: share.next_update_id as u64, }) }) .collect(); @@ -420,23 +418,33 @@ impl Server { let mut contact_user_ids = HashSet::default(); contact_user_ids.insert(host_user_id); - for github_login in request.payload.authorized_logins { - let contact_user_id = self.app_state.db.create_user(&github_login, false).await?; + for github_login in &request.payload.authorized_logins { + let contact_user_id = self.app_state.db.create_user(github_login, false).await?; contact_user_ids.insert(contact_user_id); } let contact_user_ids = contact_user_ids.into_iter().collect::>(); - self.state_mut().register_worktree( - request.payload.project_id, - request.payload.worktree_id, - request.sender_id, - Worktree { - authorized_user_ids: contact_user_ids.clone(), - root_name: request.payload.root_name, - share: None, - weak: false, - }, - )?; + let guest_connection_ids; + { + let mut state = self.state_mut(); + guest_connection_ids = state + .read_project(request.payload.project_id, request.sender_id)? + .guest_connection_ids(); + state.register_worktree( + request.payload.project_id, + request.payload.worktree_id, + request.sender_id, + Worktree { + authorized_user_ids: contact_user_ids.clone(), + root_name: request.payload.root_name.clone(), + weak: request.payload.weak, + }, + )?; + } + broadcast(request.sender_id, guest_connection_ids, |connection_id| { + self.peer + .forward_send(request.sender_id, connection_id, request.payload.clone()) + })?; self.update_contacts_for_users(&contact_user_ids)?; Ok(proto::Ack {}) } @@ -463,48 +471,6 @@ impl Server { Ok(()) } - async fn share_worktree( - mut self: Arc, - mut request: TypedEnvelope, - ) -> tide::Result { - let worktree = request - .payload - .worktree - .as_mut() - .ok_or_else(|| anyhow!("missing worktree"))?; - let entries = worktree - .entries - .iter() - .map(|entry| (entry.id, entry.clone())) - .collect(); - let diagnostic_summaries = worktree - .diagnostic_summaries - .iter() - .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone())) - .collect(); - - let shared_worktree = self.state_mut().share_worktree( - request.payload.project_id, - worktree.id, - request.sender_id, - entries, - diagnostic_summaries, - worktree.next_update_id, - )?; - - broadcast( - request.sender_id, - shared_worktree.connection_ids, - |connection_id| { - self.peer - .forward_send(request.sender_id, connection_id, request.payload.clone()) - }, - )?; - self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?; - - Ok(proto::Ack {}) - } - async fn update_worktree( mut self: Arc, request: TypedEnvelope, @@ -513,7 +479,6 @@ impl Server { request.sender_id, request.payload.project_id, request.payload.worktree_id, - request.payload.id, &request.payload.removed_entries, &request.payload.updated_entries, )?; @@ -1198,7 +1163,7 @@ mod tests { cell::Cell, env, ops::Deref, - path::Path, + path::{Path, PathBuf}, rc::Rc, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -1218,7 +1183,7 @@ mod tests { fs::{FakeFs, Fs as _}, language::{ tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, - LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, + LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition, }, lsp, project::{DiagnosticSummary, Project, ProjectPath}, @@ -2149,16 +2114,14 @@ mod tests { let worktree = store .project(project_id) .unwrap() + .share + .as_ref() + .unwrap() .worktrees .get(&worktree_id.to_proto()) .unwrap(); - !worktree - .share - .as_ref() - .unwrap() - .diagnostic_summaries - .is_empty() + !worktree.diagnostic_summaries.is_empty() }) .await; @@ -2389,7 +2352,7 @@ mod tests { // Return some completions from the host's language server. cx_a.foreground().start_waiting(); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -2455,23 +2418,28 @@ mod tests { // Return a resolved completion from the host's language server. // The resolved completion has an additional text edit. - fake_language_server.handle_request::(|params| { - assert_eq!(params.label, "first_method(…)"); - lsp::CompletionItem { - label: "first_method(…)".into(), - detail: Some("fn(&mut self, B) -> C".into()), - text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { - new_text: "first_method($1)".to_string(), - range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)), - })), - additional_text_edits: Some(vec![lsp::TextEdit { - new_text: "use d::SomeTrait;\n".to_string(), - range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)), - }]), - insert_text_format: Some(lsp::InsertTextFormat::SNIPPET), - ..Default::default() - } - }); + fake_language_server.handle_request::( + |params, _| { + assert_eq!(params.label, "first_method(…)"); + lsp::CompletionItem { + label: "first_method(…)".into(), + detail: Some("fn(&mut self, B) -> C".into()), + text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { + new_text: "first_method($1)".to_string(), + range: lsp::Range::new( + lsp::Position::new(0, 14), + lsp::Position::new(0, 14), + ), + })), + additional_text_edits: Some(vec![lsp::TextEdit { + new_text: "use d::SomeTrait;\n".to_string(), + range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)), + }]), + insert_text_format: Some(lsp::InsertTextFormat::SNIPPET), + ..Default::default() + } + }, + ); // The additional edit is applied. buffer_a @@ -2568,7 +2536,7 @@ mod tests { }); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(vec![ lsp::TextEdit { range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)), @@ -2677,7 +2645,7 @@ mod tests { let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root-2/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), @@ -2702,7 +2670,7 @@ mod tests { // Try getting more definitions for the same buffer, ensuring the buffer gets reused from // the previous call to `definition`. let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx)); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root-2/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)), @@ -2826,7 +2794,7 @@ mod tests { let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|params| { + fake_language_server.handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///root-1/one.rs" @@ -2954,7 +2922,7 @@ mod tests { let mut fake_language_server = fake_language_servers.next().await.unwrap(); fake_language_server.handle_request::( - |params| { + |params, _| { assert_eq!( params .text_document_position_params @@ -3103,7 +3071,7 @@ mod tests { // Request the definition of a symbol as the guest. let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx)); let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { #[allow(deprecated)] Some(vec![lsp::SymbolInformation { name: "TWO".into(), @@ -3245,7 +3213,7 @@ mod tests { } let mut fake_language_server = fake_language_servers.next().await.unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( lsp::Url::from_file_path("/root/b.rs").unwrap(), lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), @@ -3353,7 +3321,7 @@ mod tests { let mut fake_language_server = fake_language_servers.next().await.unwrap(); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -3372,7 +3340,7 @@ mod tests { }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document.uri, lsp::Url::from_file_path("/a/main.rs").unwrap(), @@ -3443,7 +3411,7 @@ mod tests { Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx) }) .unwrap(); - fake_language_server.handle_request::(|_| { + fake_language_server.handle_request::(|_, _| { lsp::CodeAction { title: "Inline into all callers".to_string(), edit: Some(lsp::WorkspaceEdit { @@ -3598,7 +3566,7 @@ mod tests { }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs"); assert_eq!(params.position, lsp::Position::new(0, 7)); Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( @@ -3628,7 +3596,7 @@ mod tests { Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap() }); fake_language_server - .handle_request::(|params| { + .handle_request::(|params, _| { assert_eq!( params.text_document_position.text_document.uri.as_str(), "file:///dir/one.rs" @@ -4412,12 +4380,6 @@ mod tests { .worktrees(cx) .map(|worktree| { let worktree = worktree.read(cx); - assert!( - !worktree.as_remote().unwrap().has_pending_updates(), - "Guest {} worktree {:?} contains deferred updates", - guest_id, - worktree.id() - ); (worktree.id(), worktree.snapshot()) }) .collect::>() @@ -4472,9 +4434,11 @@ mod tests { assert_eq!( guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()), host_buffer.read_with(&host_cx, |buffer, _| buffer.text()), - "guest {} buffer {} differs from the host's buffer", + "guest {}, buffer {}, path {:?}, differs from the host's buffer", guest_id, buffer_id, + host_buffer + .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)) ); } } @@ -4683,8 +4647,9 @@ mod tests { language_server_config.set_fake_initializer({ let rng = rng.clone(); let files = files.clone(); + let project = project.clone(); move |fake_server| { - fake_server.handle_request::(|_| { + fake_server.handle_request::(|_, _| { Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem { text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { range: lsp::Range::new( @@ -4697,7 +4662,7 @@ mod tests { }])) }); - fake_server.handle_request::(|_| { + fake_server.handle_request::(|_, _| { Some(vec![lsp::CodeActionOrCommand::CodeAction( lsp::CodeAction { title: "the-code-action".to_string(), @@ -4706,33 +4671,75 @@ mod tests { )]) }); - fake_server.handle_request::(|params| { - Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( - params.position, - params.position, - ))) - }); + fake_server.handle_request::( + |params, _| { + Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( + params.position, + params.position, + ))) + }, + ); fake_server.handle_request::({ let files = files.clone(); let rng = rng.clone(); - move |_| { + move |_, _| { let files = files.lock(); let mut rng = rng.lock(); let count = rng.gen_range::(1..3); + let files = (0..count) + .map(|_| files.choose(&mut *rng).unwrap()) + .collect::>(); + log::info!("LSP: Returning definitions in files {:?}", &files); Some(lsp::GotoDefinitionResponse::Array( - (0..count) - .map(|_| { - let file = files.choose(&mut *rng).unwrap().as_path(); - lsp::Location { - uri: lsp::Url::from_file_path(file).unwrap(), - range: Default::default(), - } + files + .into_iter() + .map(|file| lsp::Location { + uri: lsp::Url::from_file_path(file).unwrap(), + range: Default::default(), }) .collect(), )) } }); + + fake_server.handle_request::({ + let rng = rng.clone(); + let project = project.clone(); + move |params, mut cx| { + project.update(&mut cx, |project, cx| { + let path = params + .text_document_position_params + .text_document + .uri + .to_file_path() + .unwrap(); + let (worktree, relative_path) = + project.find_local_worktree(&path, cx)?; + let project_path = + ProjectPath::from((worktree.read(cx).id(), relative_path)); + let buffer = project.get_open_buffer(&project_path, cx)?.read(cx); + + let mut highlights = Vec::new(); + let highlight_count = rng.lock().gen_range(1..=5); + let mut prev_end = 0; + for _ in 0..highlight_count { + let range = + buffer.random_byte_range(prev_end, &mut *rng.lock()); + let start = + buffer.offset_to_point_utf16(range.start).to_lsp_position(); + let end = + buffer.offset_to_point_utf16(range.end).to_lsp_position(); + highlights.push(lsp::DocumentHighlight { + range: lsp::Range::new(start, end), + kind: Some(lsp::DocumentHighlightKind::READ), + }); + prev_end = range.end; + } + Some(highlights) + }) + } + }); } }); @@ -4778,13 +4785,17 @@ mod tests { let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); let (worktree, path) = project .update(&mut cx, |project, cx| { - project.find_or_create_local_worktree(file, false, cx) + project.find_or_create_local_worktree( + file.clone(), + false, + cx, + ) }) .await .unwrap(); let project_path = worktree.read_with(&cx, |worktree, _| (worktree.id(), path)); - log::info!("Host: opening path {:?}", project_path); + log::info!("Host: opening path {:?}, {:?}", file, project_path); let buffer = project .update(&mut cx, |project, cx| { project.open_buffer(project_path, cx) @@ -4847,6 +4858,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Host done"); + self.project = Some(project); (self, cx) } @@ -4867,7 +4880,8 @@ mod tests { project .worktrees(&cx) .filter(|worktree| { - worktree.read(cx).entries(false).any(|e| e.is_file()) + let worktree = worktree.read(cx); + !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file()) }) .choose(&mut *rng.lock()) }) { @@ -4878,15 +4892,25 @@ mod tests { }; operations.set(operations.get() + 1); - let project_path = worktree.read_with(&cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - (worktree.id(), entry.path.clone()) - }); - log::info!("Guest {}: opening path {:?}", guest_id, project_path); + let (worktree_root_name, project_path) = + worktree.read_with(&cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "Guest {}: opening path in worktree {:?} {:?} {:?}", + guest_id, + project_path.0, + worktree_root_name, + project_path.1 + ); let buffer = project .update(&mut cx, |project, cx| project.open_buffer(project_path, cx)) .await @@ -5010,13 +5034,34 @@ mod tests { project.definition(&buffer, offset, cx) }); let definitions = cx.background().spawn(async move { - definitions.await.expect("definitions request failed"); + definitions.await.expect("definitions request failed") }); if rng.lock().gen_bool(0.3) { log::info!("Guest {}: detaching definitions request", guest_id); definitions.detach(); } else { - definitions.await; + self.buffers + .extend(definitions.await.into_iter().map(|loc| loc.buffer)); + } + } + 50..=55 => { + let highlights = project.update(&mut cx, |project, cx| { + log::info!( + "Guest {}: requesting highlights for buffer {:?}", + guest_id, + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights.await.expect("highlights request failed"); + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching highlights request", guest_id); + highlights.detach(); + } else { + highlights.await; } } _ => { @@ -5033,6 +5078,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Guest {} done", guest_id); + self.project = Some(project); (self, cx) } diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 41c611a0972229261c7a80884ccceb309ee97ff4..e6c4429b6960a749175f4ee9fac25a792e715fbc 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -30,7 +30,6 @@ pub struct Project { pub struct Worktree { pub authorized_user_ids: Vec, pub root_name: String, - pub share: Option, pub weak: bool, } @@ -38,12 +37,13 @@ pub struct Worktree { pub struct ProjectShare { pub guests: HashMap, pub active_replica_ids: HashSet, + pub worktrees: HashMap, } +#[derive(Default)] pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, - pub next_update_id: u64, } #[derive(Default)] @@ -75,11 +75,6 @@ pub struct LeftProject { pub authorized_user_ids: Vec, } -pub struct SharedWorktree { - pub authorized_user_ids: Vec, - pub connection_ids: Vec, -} - impl Store { pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { self.connections.insert( @@ -273,6 +268,9 @@ impl Store { connection.projects.insert(project_id); } project.worktrees.insert(worktree_id, worktree); + if let Ok(share) = project.share_mut() { + share.worktrees.insert(worktree_id, Default::default()); + } #[cfg(test)] self.check_invariants(); @@ -327,8 +325,9 @@ impl Store { .ok_or_else(|| anyhow!("no such worktree"))?; let mut guest_connection_ids = Vec::new(); - if let Some(share) = &project.share { + if let Ok(share) = project.share_mut() { guest_connection_ids.extend(share.guests.keys()); + share.worktrees.remove(&worktree_id); } for authorized_user_id in &worktree.authorized_user_ids { @@ -350,7 +349,11 @@ impl Store { pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool { if let Some(project) = self.projects.get_mut(&project_id) { if project.host_connection_id == connection_id { - project.share = Some(ProjectShare::default()); + let mut share = ProjectShare::default(); + for worktree_id in project.worktrees.keys() { + share.worktrees.insert(*worktree_id, Default::default()); + } + project.share = Some(share); return true; } } @@ -381,10 +384,6 @@ impl Store { } } - for worktree in project.worktrees.values_mut() { - worktree.share.take(); - } - #[cfg(test)] self.check_invariants(); @@ -397,38 +396,6 @@ impl Store { } } - pub fn share_worktree( - &mut self, - project_id: u64, - worktree_id: u64, - connection_id: ConnectionId, - entries: HashMap, - diagnostic_summaries: BTreeMap, - next_update_id: u64, - ) -> tide::Result { - let project = self - .projects - .get_mut(&project_id) - .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; - if project.host_connection_id == connection_id && project.share.is_some() { - worktree.share = Some(WorktreeShare { - entries, - diagnostic_summaries, - next_update_id, - }); - Ok(SharedWorktree { - authorized_user_ids: project.authorized_user_ids(), - connection_ids: project.guest_connection_ids(), - }) - } else { - Err(anyhow!("no such worktree"))? - } - } - pub fn update_diagnostic_summary( &mut self, project_id: u64, @@ -440,17 +407,16 @@ impl Store { .projects .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; if project.host_connection_id == connection_id { - if let Some(share) = worktree.share.as_mut() { - share - .diagnostic_summaries - .insert(summary.path.clone().into(), summary); - return Ok(project.connection_ids()); - } + let worktree = project + .share_mut()? + .worktrees + .get_mut(&worktree_id) + .ok_or_else(|| anyhow!("no such worktree"))?; + worktree + .diagnostic_summaries + .insert(summary.path.clone().into(), summary); + return Ok(project.connection_ids()); } Err(anyhow!("no such worktree"))? @@ -537,28 +503,20 @@ impl Store { connection_id: ConnectionId, project_id: u64, worktree_id: u64, - update_id: u64, removed_entries: &[u64], updated_entries: &[proto::Entry], ) -> tide::Result> { let project = self.write_project(project_id, connection_id)?; - let share = project + let worktree = project + .share_mut()? .worktrees .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))? - .share - .as_mut() - .ok_or_else(|| anyhow!("worktree is not shared"))?; - if share.next_update_id != update_id { - return Err(anyhow!("received worktree updates out-of-order"))?; - } - - share.next_update_id = update_id + 1; + .ok_or_else(|| anyhow!("no such worktree"))?; for entry_id in removed_entries { - share.entries.remove(&entry_id); + worktree.entries.remove(&entry_id); } for entry in updated_entries { - share.entries.insert(entry.id, entry.clone()); + worktree.entries.insert(entry.id, entry.clone()); } Ok(project.connection_ids()) }