diff --git a/Cargo.lock b/Cargo.lock index 29c2228e7d31dc70d08b61e57e05f43c4dd726a9..dc7b6921a7ab67126cfbeab322582b9ded3d6f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,7 @@ dependencies = [ "block", "cc", "cocoa", + "collections", "core-foundation", "core-graphics", "core-text", diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 1630a454b79296e27ec9eb1545aeb8f438b010e6..7402417196250affb845185454a657894075f542 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -94,7 +94,7 @@ impl FakeServer { Err(EstablishConnectionError::Unauthorized)? } - let (client_conn, server_conn, _) = Connection::in_memory(); + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); let (connection_id, io, incoming) = self.peer.add_connection(server_conn).await; cx.background().spawn(io).detach(); *self.incoming.lock() = Some(incoming); diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index 4414936c9e3b81c8e46cd02f04f321007daab374..197a5ca12eb4bd79e86423c78c2107acbe828b09 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -8,9 +8,10 @@ version = "0.1.0" path = "src/gpui.rs" [features] -test-support = ["env_logger"] +test-support = ["env_logger", "collections/test-support"] [dependencies] +collections = { path = "../collections" } gpui_macros = { path = "../gpui_macros" } sum_tree = { path = "../sum_tree" } async-task = "4.0.3" @@ -47,6 +48,7 @@ bindgen = "0.58.1" cc = "1.0.67" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } env_logger = "0.8" png = "0.16" simplelog = "0.9" diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 84efd2c6e0b6b3ee9d81736821b27f581a460e10..5e37acbc1d7eef65f1f9e3def62c35b07e19fdfd 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -1,10 +1,11 @@ use anyhow::{anyhow, Result}; use async_task::Runnable; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; +use collections::HashMap; use parking_lot::Mutex; use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; -use smol::{channel, prelude::*, Executor, Timer}; +use smol::{channel, future::yield_now, prelude::*, Executor, Timer}; use std::{ any::Any, fmt::{self, Debug, Display}, @@ -33,8 +34,10 @@ pub enum Foreground { dispatcher: Arc, _not_send_or_sync: PhantomData>, }, - Test(smol::LocalExecutor<'static>), - Deterministic(Arc), + Deterministic { + cx_id: usize, + executor: Arc, + }, } pub enum Background { @@ -70,9 +73,8 @@ unsafe impl Send for Task {} struct DeterministicState { rng: StdRng, seed: u64, - scheduled_from_foreground: Vec<(Runnable, Backtrace)>, - scheduled_from_background: Vec<(Runnable, Backtrace)>, - spawned_from_foreground: Vec<(Runnable, Backtrace)>, + scheduled_from_foreground: HashMap>, + scheduled_from_background: Vec, forbid_parking: bool, block_on_ticks: RangeInclusive, now: Instant, @@ -80,20 +82,24 @@ struct DeterministicState { waiting_backtrace: Option, } +struct ForegroundRunnable { + runnable: Runnable, + main: bool, +} + pub struct Deterministic { state: Arc>, parker: Mutex, } impl Deterministic { - fn new(seed: u64) -> Self { - Self { + pub fn new(seed: u64) -> Arc { + Arc::new(Self { state: Arc::new(Mutex::new(DeterministicState { rng: StdRng::seed_from_u64(seed), seed, scheduled_from_foreground: Default::default(), scheduled_from_background: Default::default(), - spawned_from_foreground: Default::default(), forbid_parking: false, block_on_ticks: 0..=1000, now: Instant::now(), @@ -101,22 +107,37 @@ impl Deterministic { waiting_backtrace: None, })), parker: Default::default(), - } + }) + } + + pub fn build_background(self: &Arc) -> Arc { + Arc::new(Background::Deterministic { + executor: self.clone(), + }) } - fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask { - let backtrace = Backtrace::new_unresolved(); - let scheduled_once = AtomicBool::new(false); + pub fn build_foreground(self: &Arc, id: usize) -> Rc { + Rc::new(Foreground::Deterministic { + cx_id: id, + executor: self.clone(), + }) + } + + fn spawn_from_foreground( + &self, + cx_id: usize, + future: AnyLocalFuture, + main: bool, + ) -> AnyLocalTask { let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { let mut state = state.lock(); - let backtrace = backtrace.clone(); - if scheduled_once.fetch_or(true, SeqCst) { - state.scheduled_from_foreground.push((runnable, backtrace)); - } else { - state.spawned_from_foreground.push((runnable, backtrace)); - } + state + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .push(ForegroundRunnable { runnable, main }); unparker.unpark(); }); runnable.schedule(); @@ -124,24 +145,23 @@ impl Deterministic { } fn spawn(&self, future: AnyFuture) -> AnyTask { - let backtrace = Backtrace::new_unresolved(); let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn(future, move |runnable| { let mut state = state.lock(); - state - .scheduled_from_background - .push((runnable, backtrace.clone())); + state.scheduled_from_background.push(runnable); unparker.unpark(); }); runnable.schedule(); task } - fn run(&self, mut future: AnyLocalFuture) -> Box { + fn run(&self, cx_id: usize, main_future: AnyLocalFuture) -> Box { let woken = Arc::new(AtomicBool::new(false)); + let mut main_task = self.spawn_from_foreground(cx_id, main_future, true); + loop { - if let Some(result) = self.run_internal(woken.clone(), &mut future) { + if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) { return result; } @@ -156,14 +176,13 @@ impl Deterministic { fn run_until_parked(&self) { let woken = Arc::new(AtomicBool::new(false)); - let mut future = any_local_future(std::future::pending::<()>()); - self.run_internal(woken, &mut future); + self.run_internal(woken, None); } fn run_internal( &self, woken: Arc, - future: &mut AnyLocalFuture, + mut main_task: Option<&mut AnyLocalTask>, ) -> Option> { let unparker = self.parker.lock().unparker(); let waker = waker_fn(move || { @@ -172,48 +191,46 @@ impl Deterministic { }); let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); loop { let mut state = self.state.lock(); - let runnable_count = state.scheduled_from_foreground.len() - + state.scheduled_from_background.len() - + state.spawned_from_foreground.len(); - let ix = state.rng.gen_range(0..=runnable_count); - if ix < state.scheduled_from_foreground.len() { - let (_, backtrace) = &state.scheduled_from_foreground[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_foreground.remove(ix).0; - drop(state); - runnable.run(); - } else if ix - state.scheduled_from_foreground.len() - < state.scheduled_from_background.len() + if state.scheduled_from_foreground.is_empty() + && state.scheduled_from_background.is_empty() { - let ix = ix - state.scheduled_from_foreground.len(); - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; - drop(state); - runnable.run(); - } else if ix < runnable_count { - let (_, backtrace) = &state.spawned_from_foreground[0]; - trace.record(&state, backtrace.clone()); - let runnable = state.spawned_from_foreground.remove(0).0; + return None; + } + + if !state.scheduled_from_background.is_empty() && state.rng.gen() { + let background_len = state.scheduled_from_background.len(); + let ix = state.rng.gen_range(0..background_len); + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); - } else { - drop(state); - if let Poll::Ready(result) = future.poll(&mut cx) { - return Some(result); + } else if !state.scheduled_from_foreground.is_empty() { + let available_cx_ids = state + .scheduled_from_foreground + .keys() + .copied() + .collect::>(); + let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap(); + let scheduled_from_cx = state + .scheduled_from_foreground + .get_mut(&cx_id_to_run) + .unwrap(); + let foreground_runnable = scheduled_from_cx.remove(0); + if scheduled_from_cx.is_empty() { + state.scheduled_from_foreground.remove(&cx_id_to_run); } - let state = self.state.lock(); + drop(state); - if state.scheduled_from_foreground.is_empty() - && state.scheduled_from_background.is_empty() - && state.spawned_from_foreground.is_empty() - { - return None; + foreground_runnable.runnable.run(); + if let Some(main_task) = main_task.as_mut() { + if foreground_runnable.main { + if let Poll::Ready(result) = main_task.poll(&mut cx) { + return Some(result); + } + } } } } @@ -231,15 +248,12 @@ impl Deterministic { }; let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); for _ in 0..max_ticks { let mut state = self.state.lock(); let runnable_count = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled_from_background.len() { - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); } else { @@ -282,69 +296,13 @@ impl DeterministicState { } } -#[derive(Default)] -struct Trace { - executed: Vec, - scheduled: Vec>, - spawned_from_foreground: Vec>, -} - -impl Trace { - fn record(&mut self, state: &DeterministicState, executed: Backtrace) { - self.scheduled.push( - state - .scheduled_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.spawned_from_foreground.push( - state - .spawned_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.executed.push(executed); - } - - fn resolve(&mut self) { - for backtrace in &mut self.executed { - backtrace.resolve(); - } - - for backtraces in &mut self.scheduled { - for backtrace in backtraces { - backtrace.resolve(); - } - } - - for backtraces in &mut self.spawned_from_foreground { - for backtrace in backtraces { - backtrace.resolve(); - } - } - } -} - struct CwdBacktrace<'a> { backtrace: &'a Backtrace, - first_frame_only: bool, } impl<'a> CwdBacktrace<'a> { fn new(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: false, - } - } - - fn first_frame(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: true, - } + Self { backtrace } } } @@ -363,69 +321,12 @@ impl<'a> Debug for CwdBacktrace<'a> { .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd))) { formatted_frame.backtrace_frame(frame)?; - if self.first_frame_only { - break; - } } } fmt.finish() } } -impl Debug for Trace { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for ((backtrace, scheduled), spawned_from_foreground) in self - .executed - .iter() - .zip(&self.scheduled) - .zip(&self.spawned_from_foreground) - { - writeln!(f, "Scheduled")?; - for backtrace in scheduled { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if scheduled.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Spawned from foreground")?; - for backtrace in spawned_from_foreground { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if spawned_from_foreground.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?; - writeln!(f, "+++++++++++++++++++")?; - } - - Ok(()) - } -} - -impl Drop for Trace { - fn drop(&mut self) { - let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") { - trace_on_panic == "1" || trace_on_panic == "true" - } else { - false - }; - let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") { - trace_always == "1" || trace_always == "true" - } else { - false - }; - - if trace_always || (trace_on_panic && thread::panicking()) { - self.resolve(); - dbg!(self); - } - } -} - impl Foreground { pub fn platform(dispatcher: Arc) -> Result { if dispatcher.is_main_thread() { @@ -438,14 +339,12 @@ impl Foreground { } } - pub fn test() -> Self { - Self::Test(smol::LocalExecutor::new()) - } - pub fn spawn(&self, future: impl Future + 'static) -> Task { let future = any_local_future(future); let any_task = match self { - Self::Deterministic(executor) => executor.spawn_from_foreground(future), + Self::Deterministic { cx_id, executor } => { + executor.spawn_from_foreground(*cx_id, future, false) + } Self::Platform { dispatcher, .. } => { fn spawn_inner( future: AnyLocalFuture, @@ -460,7 +359,6 @@ impl Foreground { } spawn_inner(future, dispatcher) } - Self::Test(executor) => executor.spawn(future), }; Task::local(any_task) } @@ -468,23 +366,22 @@ impl Foreground { pub fn run(&self, future: impl 'static + Future) -> T { let future = any_local_future(future); let any_value = match self { - Self::Deterministic(executor) => executor.run(future), + Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), - Self::Test(executor) => smol::block_on(executor.run(future)), }; *any_value.downcast().unwrap() } pub fn parking_forbidden(&self) -> bool { match self { - Self::Deterministic(executor) => executor.state.lock().forbid_parking, + Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking, _ => panic!("this method can only be called on a deterministic executor"), } } pub fn start_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved()); } _ => panic!("this method can only be called on a deterministic executor"), @@ -493,7 +390,7 @@ impl Foreground { pub fn finish_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace.take(); } _ => panic!("this method can only be called on a deterministic executor"), @@ -502,7 +399,7 @@ impl Foreground { pub fn forbid_parking(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let mut state = executor.state.lock(); state.forbid_parking = true; state.rng = StdRng::seed_from_u64(state.seed); @@ -513,7 +410,7 @@ impl Foreground { pub async fn timer(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let (tx, mut rx) = barrier::channel(); { let mut state = executor.state.lock(); @@ -530,7 +427,7 @@ impl Foreground { pub fn advance_clock(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.run_until_parked(); let mut state = executor.state.lock(); @@ -548,7 +445,7 @@ impl Foreground { pub fn set_block_on_ticks(&self, range: RangeInclusive) { match self { - Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range, + Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range, _ => panic!("this method can only be called on a deterministic executor"), } } @@ -586,7 +483,7 @@ impl Background { let future = any_future(future); let any_task = match self { Self::Production { executor, .. } => executor.spawn(future), - Self::Deterministic { executor, .. } => executor.spawn(future), + Self::Deterministic { executor } => executor.spawn(future), }; Task::send(any_task) } @@ -631,6 +528,17 @@ impl Background { task.await; } } + + pub async fn simulate_random_delay(&self) { + match self { + Self::Deterministic { executor, .. } => { + if executor.state.lock().rng.gen_range(0..100) < 20 { + yield_now().await; + } + } + _ => panic!("this method can only be called on a deterministic executor"), + } + } } pub struct Scope<'a> { @@ -653,14 +561,6 @@ impl<'a> Scope<'a> { } } -pub fn deterministic(seed: u64) -> (Rc, Arc) { - let executor = Arc::new(Deterministic::new(seed)); - ( - Rc::new(Foreground::Deterministic(executor.clone())), - Arc::new(Background::Deterministic { executor }), - ) -} - impl Task { pub fn ready(value: T) -> Self { Self::Ready(Some(value)) diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index ef95ea435ac34ea31d53d37d61e169094b3efde5..b4b4e621ac313828da5c8a9185d43a501986bd30 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -28,7 +28,12 @@ pub fn run_test( mut starting_seed: u64, max_retries: usize, test_fn: &mut (dyn RefUnwindSafe - + Fn(&mut MutableAppContext, Rc, u64)), + + Fn( + &mut MutableAppContext, + Rc, + Arc, + u64, + )), ) { let is_randomized = num_iterations > 1; if is_randomized { @@ -60,16 +65,16 @@ pub fn run_test( dbg!(seed); } - let (foreground, background) = executor::deterministic(seed); + let deterministic = executor::Deterministic::new(seed); let mut cx = TestAppContext::new( foreground_platform.clone(), platform.clone(), - foreground.clone(), - background.clone(), + deterministic.build_foreground(usize::MAX), + deterministic.build_background(), font_cache.clone(), 0, ); - cx.update(|cx| test_fn(cx, foreground_platform.clone(), seed)); + cx.update(|cx| test_fn(cx, foreground_platform.clone(), deterministic, seed)); atomic_seed.fetch_add(1, SeqCst); } diff --git a/crates/gpui_macros/src/gpui_macros.rs b/crates/gpui_macros/src/gpui_macros.rs index e94318172ea4d4407e5a5b0edf5041d749b068d4..21d978d9fb3e56a6a212ee8d5123cf095266f452 100644 --- a/crates/gpui_macros/src/gpui_macros.rs +++ b/crates/gpui_macros/src/gpui_macros.rs @@ -77,8 +77,8 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #namespace::TestAppContext::new( foreground_platform.clone(), cx.platform().clone(), - cx.foreground().clone(), - cx.background().clone(), + deterministic.build_foreground(#ix), + deterministic.build_background(), cx.font_cache().clone(), #first_entity_id, ), @@ -115,7 +115,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, foreground_platform, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) + &mut |cx, foreground_platform, deterministic, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) ); } } @@ -147,7 +147,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, _, seed| #inner_fn_name(#inner_fn_args) + &mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args) ); } } diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index bd361e9731d2dc1bd8f9bb78b29e16ae992cbac1..eae15fe5af5c7dd524e8fa6ba6cb1ac84cf7fdf7 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -305,7 +305,7 @@ impl Buffer { pub fn from_proto( replica_id: ReplicaId, - message: proto::Buffer, + message: proto::BufferState, file: Option>, cx: &mut ModelContext, ) -> Result { @@ -359,8 +359,8 @@ impl Buffer { Ok(this) } - pub fn to_proto(&self) -> proto::Buffer { - proto::Buffer { + pub fn to_proto(&self) -> proto::BufferState { + proto::BufferState { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), visible_text: self.text.text(), diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index 771d8b7fd3e9a4303dfdf69ceb9f6df03479dffc..0f9ee69956910083f9145955ddba37cd47174749 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -7,7 +7,7 @@ use rpc::proto; use std::sync::Arc; use text::*; -pub use proto::{Buffer, SelectionSet}; +pub use proto::{Buffer, BufferState, SelectionSet}; pub fn serialize_operation(operation: &Operation) -> proto::Operation { proto::Operation { @@ -155,7 +155,7 @@ pub fn serialize_diagnostics<'a>( .collect() } -fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { +pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { proto::Anchor { replica_id: anchor.timestamp.replica_id as u32, local_timestamp: anchor.timestamp.value, @@ -352,7 +352,7 @@ pub fn deserialize_diagnostics( .collect() } -fn deserialize_anchor(anchor: proto::Anchor) -> Option { +pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { Some(Anchor { timestamp: clock::Local { replica_id: anchor.replica_id as ReplicaId, diff --git a/crates/project/src/fs.rs b/crates/project/src/fs.rs index 895d7d4cc1a0b440a4ca92f63ebed1284f025365..a26b5ed8f72d4fa21998f32c69e5e5648b9f2636 100644 --- a/crates/project/src/fs.rs +++ b/crates/project/src/fs.rs @@ -181,11 +181,12 @@ impl FakeFsState { pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. state: futures::lock::Mutex, + executor: std::sync::Arc, } #[cfg(any(test, feature = "test-support"))] impl FakeFs { - pub fn new() -> Self { + pub fn new(executor: std::sync::Arc) -> Self { let (events_tx, _) = postage::broadcast::channel(2048); let mut entries = std::collections::BTreeMap::new(); entries.insert( @@ -201,6 +202,7 @@ impl FakeFs { }, ); Self { + executor, state: futures::lock::Mutex::new(FakeFsState { entries, next_inode: 1, @@ -330,6 +332,7 @@ impl FakeFs { #[async_trait::async_trait] impl Fs for FakeFs { async fn load(&self, path: &Path) -> Result { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; let text = state .entries @@ -340,6 +343,7 @@ impl Fs for FakeFs { } async fn save(&self, path: &Path, text: &Rope) -> Result<()> { + self.executor.simulate_random_delay().await; let mut state = self.state.lock().await; state.validate_path(path)?; if let Some(entry) = state.entries.get_mut(path) { @@ -370,10 +374,12 @@ impl Fs for FakeFs { } async fn canonicalize(&self, path: &Path) -> Result { + self.executor.simulate_random_delay().await; Ok(path.to_path_buf()) } async fn is_file(&self, path: &Path) -> bool { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; state .entries @@ -382,6 +388,7 @@ impl Fs for FakeFs { } async fn metadata(&self, path: &Path) -> Result> { + self.executor.simulate_random_delay().await; let state = self.state.lock().await; Ok(state.entries.get(path).map(|entry| entry.metadata.clone())) } @@ -391,6 +398,7 @@ impl Fs for FakeFs { abs_path: &Path, ) -> Result>>>> { use futures::{future, stream}; + self.executor.simulate_random_delay().await; let state = self.state.lock().await; let abs_path = abs_path.to_path_buf(); Ok(Box::pin(stream::iter(state.entries.clone()).filter_map( @@ -410,6 +418,7 @@ impl Fs for FakeFs { _: Duration, ) -> Pin>>> { let state = self.state.lock().await; + self.executor.simulate_random_delay().await; let rx = state.events_tx.subscribe(); let path = path.to_path_buf(); Box::pin(futures::StreamExt::filter(rx, move |events| { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 04f8aa511aae340b03ad7c6e2753b8ed9361803b..8cba48132cb46744013ccb8be1c783985a40277c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -13,8 +13,9 @@ use gpui::{ WeakModelHandle, }; use language::{ + proto::{deserialize_anchor, serialize_anchor}, range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language, - LanguageRegistry, Operation, PointUtf16, ToOffset, ToPointUtf16, + LanguageRegistry, PointUtf16, ToOffset, ToPointUtf16, }; use lsp::{DiagnosticSeverity, LanguageServer}; use postage::{prelude::Stream, watch}; @@ -42,7 +43,7 @@ pub struct Project { collaborators: HashMap, subscriptions: Vec, language_servers_with_diagnostics_running: isize, - open_buffers: HashMap, + open_buffers: HashMap>, loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, @@ -50,11 +51,6 @@ pub struct Project { shared_buffers: HashMap>>, } -enum OpenBuffer { - Operations(Vec), - Loaded(WeakModelHandle), -} - enum WorktreeHandle { Strong(ModelHandle), Weak(WeakModelHandle), @@ -246,8 +242,10 @@ impl Project { let mut worktrees = Vec::new(); for worktree in response.worktrees { - worktrees - .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?); + let (worktree, load_task) = cx + .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)); + worktrees.push(worktree); + load_task.detach(); } let user_ids = response @@ -336,6 +334,7 @@ impl Project { client.subscribe_to_entity(remote_id, cx, Self::handle_save_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), client.subscribe_to_entity(remote_id, cx, Self::handle_format_buffer), + client.subscribe_to_entity(remote_id, cx, Self::handle_get_definition), ]); } } @@ -458,6 +457,7 @@ impl Project { rpc.send(proto::UnshareProject { project_id }).await?; this.update(&mut cx, |this, cx| { this.collaborators.clear(); + this.shared_buffers.clear(); for worktree in this.worktrees(cx).collect::>() { worktree.update(cx, |worktree, _| { worktree.as_local_mut().unwrap().unshare(); @@ -514,21 +514,18 @@ impl Project { let (mut tx, rx) = postage::watch::channel(); entry.insert(rx.clone()); - let load_buffer = worktree.update(cx, |worktree, cx| { - worktree.load_buffer(&project_path.path, cx) - }); + let load_buffer = if worktree.read(cx).is_local() { + self.open_local_buffer(&project_path.path, &worktree, cx) + } else { + self.open_remote_buffer(&project_path.path, &worktree, cx) + }; cx.spawn(move |this, mut cx| async move { let load_result = load_buffer.await; - *tx.borrow_mut() = Some(this.update(&mut cx, |this, cx| { + *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { // Record the fact that the buffer is no longer loading. this.loading_buffers.remove(&project_path); let buffer = load_result.map_err(Arc::new)?; - this.open_buffers.insert( - buffer.read(cx).remote_id() as usize, - OpenBuffer::Loaded(buffer.downgrade()), - ); - this.assign_language_to_buffer(&worktree, &buffer, cx); Ok(buffer) })); }) @@ -550,6 +547,55 @@ impl Project { }) } + fn open_local_buffer( + &mut self, + path: &Arc, + worktree: &ModelHandle, + cx: &mut ModelContext, + ) -> Task>> { + let load_buffer = worktree.update(cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + worktree.load_buffer(path, cx) + }); + let worktree = worktree.downgrade(); + cx.spawn(|this, mut cx| async move { + let buffer = load_buffer.await?; + let worktree = worktree + .upgrade(&cx) + .ok_or_else(|| anyhow!("worktree was removed"))?; + this.update(&mut cx, |this, cx| { + this.register_buffer(&buffer, Some(&worktree), cx) + })?; + Ok(buffer) + }) + } + + fn open_remote_buffer( + &mut self, + path: &Arc, + worktree: &ModelHandle, + cx: &mut ModelContext, + ) -> Task>> { + let rpc = self.client.clone(); + let project_id = self.remote_id().unwrap(); + let remote_worktree_id = worktree.read(cx).id(); + let path = path.clone(); + let path_string = path.to_string_lossy().to_string(); + cx.spawn(|this, mut cx| async move { + let response = rpc + .request(proto::OpenBuffer { + project_id, + worktree_id: remote_worktree_id.to_proto(), + path: path_string, + }) + .await?; + let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; + this.update(&mut cx, |this, cx| { + this.deserialize_remote_buffer(buffer, cx) + }) + }) + } + pub fn save_buffer_as( &self, buffer: ModelHandle, @@ -568,9 +614,7 @@ impl Project { }) .await?; this.update(&mut cx, |this, cx| { - this.open_buffers - .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade())); - this.assign_language_to_buffer(&worktree, &buffer, cx); + this.assign_language_to_buffer(&buffer, Some(&worktree), cx); }); Ok(()) }) @@ -603,25 +647,41 @@ impl Project { let mut result = None; let worktree = self.worktree_for_id(path.worktree_id, cx)?; self.open_buffers.retain(|_, buffer| { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - result = Some(buffer); - } + if let Some(buffer) = buffer.upgrade(cx) { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.worktree == worktree && file.path() == &path.path { + result = Some(buffer); } - return true; } + true + } else { + false } - false }); result } + fn register_buffer( + &mut self, + buffer: &ModelHandle, + worktree: Option<&ModelHandle>, + cx: &mut ModelContext, + ) -> Result<()> { + if self + .open_buffers + .insert(buffer.read(cx).remote_id() as usize, buffer.downgrade()) + .is_some() + { + return Err(anyhow!("registered the same buffer twice")); + } + self.assign_language_to_buffer(&buffer, worktree, cx); + Ok(()) + } + fn assign_language_to_buffer( &mut self, - worktree: &ModelHandle, buffer: &ModelHandle, + worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Option<()> { let (path, full_path) = { @@ -637,7 +697,7 @@ impl Project { // For local worktrees, start a language server if needed. // Also assign the language server and any previously stored diagnostics to the buffer. - if let Some(local_worktree) = worktree.read(cx).as_local() { + if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) { let worktree_id = local_worktree.id(); let worktree_abs_path = local_worktree.abs_path().clone(); @@ -661,7 +721,7 @@ impl Project { } } - if let Some(local_worktree) = worktree.read(cx).as_local() { + if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) { if let Some(diagnostics) = local_worktree.diagnostics_for_path(&path) { buffer.update(cx, |buffer, cx| { buffer.update_diagnostics(None, diagnostics, cx).log_err(); @@ -935,10 +995,10 @@ impl Project { cx: &mut ModelContext, ) -> Task>> { let source_buffer_handle = source_buffer_handle.clone(); - let buffer = source_buffer_handle.read(cx); + let source_buffer = source_buffer_handle.read(cx); let worktree; let buffer_abs_path; - if let Some(file) = File::from_dyn(buffer.file()) { + if let Some(file) = File::from_dyn(source_buffer.file()) { worktree = file.worktree.clone(); buffer_abs_path = file.as_local().map(|f| f.abs_path(cx)); } else { @@ -946,11 +1006,11 @@ impl Project { }; if worktree.read(cx).as_local().is_some() { - let point = buffer.offset_to_point_utf16(position.to_offset(buffer)); + let point = source_buffer.offset_to_point_utf16(position.to_offset(source_buffer)); let buffer_abs_path = buffer_abs_path.unwrap(); let lang_name; let lang_server; - if let Some(lang) = buffer.language() { + if let Some(lang) = source_buffer.language() { lang_name = lang.name().to_string(); if let Some(server) = self .language_servers @@ -1045,9 +1105,41 @@ impl Project { Ok(definitions) }) + } else if let Some(project_id) = self.remote_id() { + let client = self.client.clone(); + let request = proto::GetDefinition { + project_id, + buffer_id: source_buffer.remote_id(), + position: Some(serialize_anchor(&source_buffer.anchor_before(position))), + }; + cx.spawn(|this, mut cx| async move { + let response = client.request(request).await?; + this.update(&mut cx, |this, cx| { + let mut definitions = Vec::new(); + for definition in response.definitions { + let target_buffer = this.deserialize_remote_buffer( + definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?, + cx, + )?; + let target_start = definition + .target_start + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target start"))?; + let target_end = definition + .target_end + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target end"))?; + definitions.push(Definition { + target_buffer, + target_range: target_start..target_end, + }) + } + + Ok(definitions) + }) + }) } else { - log::info!("go to definition is not yet implemented for guests"); - Task::ready(Ok(Default::default())) + Task::ready(Err(anyhow!("project does not have a remote id"))) } } @@ -1173,62 +1265,60 @@ impl Project { let snapshot = worktree_handle.read(cx).snapshot(); let mut buffers_to_delete = Vec::new(); for (buffer_id, buffer) in &self.open_buffers { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| { - if let Some(old_file) = File::from_dyn(buffer.file()) { - if old_file.worktree != worktree_handle { - return; + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| { + if let Some(old_file) = File::from_dyn(buffer.file()) { + if old_file.worktree != worktree_handle { + return; + } + + let new_file = if let Some(entry) = old_file + .entry_id + .and_then(|entry_id| snapshot.entry_for_id(entry_id)) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), } + } else if let Some(entry) = + snapshot.entry_for_path(old_file.path().as_ref()) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), + } + } else { + File { + is_local: true, + entry_id: None, + path: old_file.path().clone(), + mtime: old_file.mtime(), + worktree: worktree_handle.clone(), + } + }; - let new_file = if let Some(entry) = old_file - .entry_id - .and_then(|entry_id| snapshot.entry_for_id(entry_id)) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - } - } else if let Some(entry) = - snapshot.entry_for_path(old_file.path().as_ref()) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - } - } else { - File { - is_local: true, - entry_id: None, - path: old_file.path().clone(), - mtime: old_file.mtime(), - worktree: worktree_handle.clone(), - } + if let Some(project_id) = self.remote_id() { + let client = self.client.clone(); + let message = proto::UpdateBufferFile { + project_id, + buffer_id: *buffer_id as u64, + file: Some(new_file.to_proto()), }; - - if let Some(project_id) = self.remote_id() { - let client = self.client.clone(); - let message = proto::UpdateBufferFile { - project_id, - buffer_id: *buffer_id as u64, - file: Some(new_file.to_proto()), - }; - cx.foreground() - .spawn(async move { client.send(message).await }) - .detach_and_log_err(cx); - } - buffer.file_updated(Box::new(new_file), cx).detach(); + cx.foreground() + .spawn(async move { client.send(message).await }) + .detach_and_log_err(cx); } - }); - } else { - buffers_to_delete.push(*buffer_id); - } + buffer.file_updated(Box::new(new_file), cx).detach(); + } + }); + } else { + buffers_to_delete.push(*buffer_id); } } @@ -1366,10 +1456,8 @@ impl Project { .replica_id; self.shared_buffers.remove(&peer_id); for (_, buffer) in &self.open_buffers { - if let OpenBuffer::Loaded(buffer) = buffer { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); - } + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } } cx.notify(); @@ -1388,16 +1476,9 @@ impl Project { .payload .worktree .ok_or_else(|| anyhow!("invalid worktree"))?; - cx.spawn(|this, mut cx| { - async move { - let worktree = - Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?; - this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx)); - Ok(()) - } - .log_err() - }) - .detach(); + let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); + self.add_worktree(&worktree, cx); + load_task.detach(); Ok(()) } @@ -1486,19 +1567,9 @@ impl Project { .into_iter() .map(|op| language::proto::deserialize_operation(op)) .collect::, _>>()?; - match self.open_buffers.get_mut(&buffer_id) { - Some(OpenBuffer::Operations(pending_ops)) => pending_ops.extend(ops), - Some(OpenBuffer::Loaded(buffer)) => { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; - } else { - self.open_buffers - .insert(buffer_id, OpenBuffer::Operations(ops)); - } - } - None => { - self.open_buffers - .insert(buffer_id, OpenBuffer::Operations(ops)); + if let Some(buffer) = self.open_buffers.get_mut(&buffer_id) { + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; } } Ok(()) @@ -1611,6 +1682,53 @@ impl Project { Ok(()) } + pub fn handle_get_definition( + &mut self, + envelope: TypedEnvelope, + rpc: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let receipt = envelope.receipt(); + let sender_id = envelope.original_sender_id()?; + let source_buffer = self + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + let position = envelope + .payload + .position + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("invalid position"))?; + if !source_buffer.read(cx).can_resolve(&position) { + return Err(anyhow!("cannot resolve position")); + } + + let definitions = self.definition(&source_buffer, position, cx); + cx.spawn(|this, mut cx| async move { + let definitions = definitions.await?; + let mut response = proto::GetDefinitionResponse { + definitions: Default::default(), + }; + this.update(&mut cx, |this, cx| { + for definition in definitions { + let buffer = + this.serialize_buffer_for_peer(&definition.target_buffer, sender_id, cx); + response.definitions.push(proto::Definition { + target_start: Some(serialize_anchor(&definition.target_range.start)), + target_end: Some(serialize_anchor(&definition.target_range.end)), + buffer: Some(buffer), + }); + } + }); + rpc.respond(receipt, response).await?; + Ok::<_, anyhow::Error>(()) + }) + .detach_and_log_err(cx); + + Ok(()) + } + pub fn handle_open_buffer( &mut self, envelope: TypedEnvelope, @@ -1630,17 +1748,13 @@ impl Project { cx.spawn(|this, mut cx| { async move { let buffer = open_buffer.await?; - this.update(&mut cx, |this, _| { - this.shared_buffers - .entry(peer_id) - .or_default() - .insert(buffer.id() as u64, buffer.clone()); + let buffer = this.update(&mut cx, |this, cx| { + this.serialize_buffer_for_peer(&buffer, peer_id, cx) }); - let message = buffer.read_with(&cx, |buffer, _| buffer.to_proto()); rpc.respond( receipt, proto::OpenBufferResponse { - buffer: Some(message), + buffer: Some(buffer), }, ) .await @@ -1651,6 +1765,60 @@ impl Project { Ok(()) } + fn serialize_buffer_for_peer( + &mut self, + buffer: &ModelHandle, + peer_id: PeerId, + cx: &AppContext, + ) -> proto::Buffer { + let buffer_id = buffer.read(cx).remote_id(); + let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); + match shared_buffers.entry(buffer_id) { + hash_map::Entry::Occupied(_) => proto::Buffer { + variant: Some(proto::buffer::Variant::Id(buffer_id)), + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(buffer.clone()); + proto::Buffer { + variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())), + } + } + } + } + + fn deserialize_remote_buffer( + &mut self, + buffer: proto::Buffer, + cx: &mut ModelContext, + ) -> Result> { + match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { + proto::buffer::Variant::Id(id) => self + .open_buffers + .get(&(id as usize)) + .and_then(|buffer| buffer.upgrade(cx)) + .ok_or_else(|| anyhow!("no buffer exists for id {}", id)), + proto::buffer::Variant::State(mut buffer) => { + let mut buffer_worktree = None; + let mut buffer_file = None; + if let Some(file) = buffer.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = self + .worktree_for_id(worktree_id, cx) + .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?; + buffer_file = Some(Box::new(File::from_proto(file, worktree.clone(), cx)?) + as Box); + buffer_worktree = Some(worktree); + } + + let buffer = cx.add_model(|cx| { + Buffer::from_proto(self.replica_id(), buffer, buffer_file, cx).unwrap() + }); + self.register_buffer(&buffer, buffer_worktree.as_ref(), cx)?; + Ok(buffer) + } + } + } + pub fn handle_close_buffer( &mut self, envelope: TypedEnvelope, @@ -1674,13 +1842,7 @@ impl Project { let buffer = self .open_buffers .get(&(payload.buffer_id as usize)) - .and_then(|buf| { - if let OpenBuffer::Loaded(buffer) = buf { - buffer.upgrade(cx) - } else { - None - } - }); + .and_then(|buffer| buffer.upgrade(cx)); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { let version = payload.version.try_into()?; @@ -1705,13 +1867,7 @@ impl Project { let buffer = self .open_buffers .get(&(payload.buffer_id as usize)) - .and_then(|buf| { - if let OpenBuffer::Loaded(buffer) = buf { - buffer.upgrade(cx) - } else { - None - } - }); + .and_then(|buffer| buffer.upgrade(cx)); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { let version = payload.version.try_into()?; @@ -1910,15 +2066,6 @@ impl> From<(WorktreeId, P)> for ProjectPath { } } -impl OpenBuffer { - fn upgrade(&self, cx: &AppContext) -> Option> { - match self { - OpenBuffer::Loaded(buffer) => buffer.upgrade(cx), - OpenBuffer::Operations(_) => None, - } - } -} - #[cfg(test)] mod tests { use super::{Event, *}; @@ -2292,7 +2439,7 @@ mod tests { #[gpui::test] async fn test_save_file(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/dir", json!({ @@ -2330,7 +2477,7 @@ mod tests { #[gpui::test] async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/dir", json!({ @@ -2419,15 +2566,16 @@ mod tests { // Create a remote copy of this worktree. let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot()); - let remote = Worktree::remote( - 1, - 1, - initial_snapshot.to_proto(&Default::default(), Default::default()), - rpc.clone(), - &mut cx.to_async(), - ) - .await - .unwrap(); + let (remote, load_task) = cx.update(|cx| { + Worktree::remote( + 1, + 1, + initial_snapshot.to_proto(&Default::default(), Default::default()), + rpc.clone(), + cx, + ) + }); + load_task.await; cx.read(|cx| { assert!(!buffer2.read(cx).is_dirty()); @@ -2516,7 +2664,7 @@ mod tests { #[gpui::test] async fn test_buffer_deduping(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/the-dir", json!({ @@ -2805,7 +2953,7 @@ mod tests { #[gpui::test] async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) { - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx.background())); fs.insert_tree( "/the-dir", json!({ diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index fd33386ffcc5793c2572ba7815d8c370ef19c822..11006f32baf6ec1ef06ad0688518b75c604693fe 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -190,13 +190,13 @@ impl Worktree { Ok(tree) } - pub async fn remote( + pub fn remote( project_remote_id: u64, replica_id: ReplicaId, worktree: proto::Worktree, client: Arc, - cx: &mut AsyncAppContext, - ) -> Result> { + cx: &mut MutableAppContext, + ) -> (ModelHandle, Task<()>) { let remote_id = worktree.id; let root_char_bag: CharBag = worktree .root_name @@ -205,32 +205,26 @@ impl Worktree { .collect(); let root_name = worktree.root_name.clone(); let weak = worktree.weak; - let (entries_by_path, entries_by_id, diagnostic_summaries) = cx - .background() - .spawn(async move { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in worktree.entries { - match Entry::try_from((&root_char_bag, entry)) { - Ok(entry) => { - entries_by_id_edits.push(Edit::Insert(PathEntry { - id: entry.id, - path: entry.path.clone(), - is_ignored: entry.is_ignored, - scan_id: 0, - })); - entries_by_path_edits.push(Edit::Insert(entry)); - } - Err(err) => log::warn!("error for remote worktree entry {:?}", err), - } - } - - let mut entries_by_path = SumTree::new(); - let mut entries_by_id = SumTree::new(); - entries_by_path.edit(entries_by_path_edits, &()); - entries_by_id.edit(entries_by_id_edits, &()); + let snapshot = Snapshot { + id: WorktreeId(remote_id as usize), + root_name, + root_char_bag, + entries_by_path: Default::default(), + entries_by_id: Default::default(), + }; - let diagnostic_summaries = TreeMap::from_ordered_entries( + let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); + let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let worktree_handle = cx.add_model(|_: &mut ModelContext| { + Worktree::Remote(RemoteWorktree { + project_id: project_remote_id, + replica_id, + snapshot: snapshot.clone(), + snapshot_rx: snapshot_rx.clone(), + updates_tx, + client: client.clone(), + queued_operations: Default::default(), + diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { ( PathKey(PathBuf::from(summary.path).into()), @@ -242,24 +236,48 @@ impl Worktree { }, ) }), - ); - - (entries_by_path, entries_by_id, diagnostic_summaries) + ), + weak, }) - .await; + }); - let worktree = cx.update(|cx| { - cx.add_model(|cx: &mut ModelContext| { - let snapshot = Snapshot { - id: WorktreeId(remote_id as usize), - root_name, - root_char_bag, - entries_by_path, - entries_by_id, - }; + let deserialize_task = cx.spawn({ + let worktree_handle = worktree_handle.clone(); + |cx| async move { + let (entries_by_path, entries_by_id) = cx + .background() + .spawn(async move { + let mut entries_by_path_edits = Vec::new(); + let mut entries_by_id_edits = Vec::new(); + for entry in worktree.entries { + match Entry::try_from((&root_char_bag, entry)) { + Ok(entry) => { + entries_by_id_edits.push(Edit::Insert(PathEntry { + id: entry.id, + path: entry.path.clone(), + is_ignored: entry.is_ignored, + scan_id: 0, + })); + entries_by_path_edits.push(Edit::Insert(entry)); + } + Err(err) => log::warn!("error for remote worktree entry {:?}", err), + } + } + + let mut entries_by_path = SumTree::new(); + let mut entries_by_id = SumTree::new(); + entries_by_path.edit(entries_by_path_edits, &()); + entries_by_id.edit(entries_by_id_edits, &()); + + (entries_by_path, entries_by_id) + }) + .await; - let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); - let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + { + let mut snapshot = snapshot_tx.borrow_mut(); + snapshot.entries_by_path = entries_by_path; + snapshot.entries_by_id = entries_by_id; + } cx.background() .spawn(async move { @@ -275,7 +293,8 @@ impl Worktree { { let mut snapshot_rx = snapshot_rx.clone(); - cx.spawn_weak(|this, mut cx| async move { + let this = worktree_handle.downgrade(); + cx.spawn(|mut cx| async move { while let Some(_) = snapshot_rx.recv().await { if let Some(this) = cx.read(|cx| this.upgrade(cx)) { this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); @@ -286,22 +305,9 @@ impl Worktree { }) .detach(); } - - Worktree::Remote(RemoteWorktree { - project_id: project_remote_id, - replica_id, - snapshot, - snapshot_rx, - updates_tx, - client: client.clone(), - queued_operations: Default::default(), - diagnostic_summaries, - weak, - }) - }) + } }); - - Ok(worktree) + (worktree_handle, deserialize_task) } pub fn as_local(&self) -> Option<&LocalWorktree> { @@ -361,17 +367,6 @@ impl Worktree { } } - pub fn load_buffer( - &mut self, - path: &Path, - cx: &mut ModelContext, - ) -> Task>> { - match self { - Worktree::Local(worktree) => worktree.load_buffer(path, cx), - Worktree::Remote(worktree) => worktree.load_buffer(path, cx), - } - } - pub fn diagnostic_summaries<'a>( &'a self, ) -> impl Iterator, DiagnosticSummary)> + 'a { @@ -828,41 +823,6 @@ impl LocalWorktree { } impl RemoteWorktree { - pub(crate) fn load_buffer( - &mut self, - path: &Path, - cx: &mut ModelContext, - ) -> Task>> { - let rpc = self.client.clone(); - let replica_id = self.replica_id; - let project_id = self.project_id; - let remote_worktree_id = self.id(); - let path: Arc = Arc::from(path); - let path_string = path.to_string_lossy().to_string(); - cx.spawn_weak(move |this, mut cx| async move { - let response = rpc - .request(proto::OpenBuffer { - project_id, - worktree_id: remote_worktree_id.to_proto(), - path: path_string, - }) - .await?; - - let this = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("worktree was closed"))?; - let mut remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; - let file = remote_buffer - .file - .take() - .map(|proto| cx.read(|cx| File::from_proto(proto, this.clone(), cx))) - .transpose()? - .map(|file| Box::new(file) as Box); - - Ok(cx.add_model(|cx| Buffer::from_proto(replica_id, remote_buffer, file, cx).unwrap())) - }) - } - fn snapshot(&self) -> Snapshot { self.snapshot.clone() } @@ -2472,7 +2432,7 @@ mod tests { #[gpui::test] async fn test_traversal(cx: gpui::TestAppContext) { - let fs = FakeFs::new(); + let fs = FakeFs::new(cx.background()); fs.insert_tree( "/root", json!({ diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 4be612eec77ae902db19b04cd04dcd3b19adf527..c7f701662e99f3a8a7d418c3f587e90d83ae8ecb 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -8,7 +8,7 @@ version = "0.1.0" path = "src/rpc.rs" [features] -test-support = [] +test-support = ["gpui"] [dependencies] anyhow = "1.0" @@ -25,6 +25,7 @@ rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" zstd = "0.9" +gpui = { path = "../gpui", features = ["test-support"], optional = true } [build-dependencies] prost-build = "0.8" diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cbe7ca5d1adf80c6844186efbccd2396a4f48884..5580d2a724a3aa1d74bbe10cb1905f4ae6861075 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -20,40 +20,42 @@ message Envelope { LeaveProject leave_project = 14; AddProjectCollaborator add_project_collaborator = 15; RemoveProjectCollaborator remove_project_collaborator = 16; - - RegisterWorktree register_worktree = 17; - UnregisterWorktree unregister_worktree = 18; - ShareWorktree share_worktree = 19; - UpdateWorktree update_worktree = 20; - UpdateDiagnosticSummary update_diagnostic_summary = 21; - DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 22; - DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 23; - - OpenBuffer open_buffer = 24; - OpenBufferResponse open_buffer_response = 25; - CloseBuffer close_buffer = 26; - UpdateBuffer update_buffer = 27; - UpdateBufferFile update_buffer_file = 28; - SaveBuffer save_buffer = 29; - BufferSaved buffer_saved = 30; - BufferReloaded buffer_reloaded = 31; - FormatBuffer format_buffer = 32; - - GetChannels get_channels = 33; - GetChannelsResponse get_channels_response = 34; - JoinChannel join_channel = 35; - JoinChannelResponse join_channel_response = 36; - LeaveChannel leave_channel = 37; - SendChannelMessage send_channel_message = 38; - SendChannelMessageResponse send_channel_message_response = 39; - ChannelMessageSent channel_message_sent = 40; - GetChannelMessages get_channel_messages = 41; - GetChannelMessagesResponse get_channel_messages_response = 42; - - UpdateContacts update_contacts = 43; - - GetUsers get_users = 44; - GetUsersResponse get_users_response = 45; + GetDefinition get_definition = 17; + GetDefinitionResponse get_definition_response = 18; + + RegisterWorktree register_worktree = 19; + UnregisterWorktree unregister_worktree = 20; + ShareWorktree share_worktree = 21; + UpdateWorktree update_worktree = 22; + UpdateDiagnosticSummary update_diagnostic_summary = 23; + DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 24; + DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 25; + + OpenBuffer open_buffer = 26; + OpenBufferResponse open_buffer_response = 27; + CloseBuffer close_buffer = 28; + UpdateBuffer update_buffer = 29; + UpdateBufferFile update_buffer_file = 30; + SaveBuffer save_buffer = 31; + BufferSaved buffer_saved = 32; + BufferReloaded buffer_reloaded = 33; + FormatBuffer format_buffer = 34; + + GetChannels get_channels = 35; + GetChannelsResponse get_channels_response = 36; + JoinChannel join_channel = 37; + JoinChannelResponse join_channel_response = 38; + LeaveChannel leave_channel = 39; + SendChannelMessage send_channel_message = 40; + SendChannelMessageResponse send_channel_message_response = 41; + ChannelMessageSent channel_message_sent = 42; + GetChannelMessages get_channel_messages = 43; + GetChannelMessagesResponse get_channel_messages_response = 44; + + UpdateContacts update_contacts = 45; + + GetUsers get_users = 46; + GetUsersResponse get_users_response = 47; } } @@ -134,6 +136,22 @@ message RemoveProjectCollaborator { uint32 peer_id = 2; } +message GetDefinition { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + } + +message GetDefinitionResponse { + repeated Definition definitions = 1; +} + +message Definition { + Buffer buffer = 1; + Anchor target_start = 2; + Anchor target_end = 3; +} + message OpenBuffer { uint64 project_id = 1; uint64 worktree_id = 2; @@ -303,6 +321,13 @@ message Entry { } message Buffer { + oneof variant { + uint64 id = 1; + BufferState state = 2; + } +} + +message BufferState { uint64 id = 1; optional File file = 2; string visible_text = 3; diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index 5ca845d13f1d489861fe076b2258a8d84bf8d615..a0db93287688d91899a482aa1e90508987eaee3c 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -34,12 +34,14 @@ impl Connection { } #[cfg(any(test, feature = "test-support"))] - pub fn in_memory() -> (Self, Self, postage::watch::Sender>) { + pub fn in_memory( + executor: std::sync::Arc, + ) -> (Self, Self, postage::watch::Sender>) { let (kill_tx, mut kill_rx) = postage::watch::channel_with(None); postage::stream::Stream::try_recv(&mut kill_rx).unwrap(); - let (a_tx, a_rx) = Self::channel(kill_rx.clone()); - let (b_tx, b_rx) = Self::channel(kill_rx); + let (a_tx, a_rx) = Self::channel(kill_rx.clone(), executor.clone()); + let (b_tx, b_rx) = Self::channel(kill_rx, executor); ( Self { tx: a_tx, rx: b_rx }, Self { tx: b_tx, rx: a_rx }, @@ -50,11 +52,12 @@ impl Connection { #[cfg(any(test, feature = "test-support"))] fn channel( kill_rx: postage::watch::Receiver>, + executor: std::sync::Arc, ) -> ( Box>, Box>>, ) { - use futures::{future, SinkExt as _}; + use futures::SinkExt as _; use io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); @@ -62,26 +65,39 @@ impl Connection { .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .with({ let kill_rx = kill_rx.clone(); + let executor = executor.clone(); move |msg| { - if kill_rx.borrow().is_none() { - future::ready(Ok(msg)) - } else { - future::ready(Err(Error::new(ErrorKind::Other, "connection killed").into())) - } + let kill_rx = kill_rx.clone(); + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + if kill_rx.borrow().is_none() { + Ok(msg) + } else { + Err(Error::new(ErrorKind::Other, "connection killed").into()) + } + }) } }); + let rx = rx.then(move |msg| { + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + msg + }) + }); let rx = KillableReceiver { kill_rx, rx }; (Box::new(tx), Box::new(rx)) } } -struct KillableReceiver { - rx: mpsc::UnboundedReceiver, +struct KillableReceiver { + rx: S, kill_rx: postage::watch::Receiver>, } -impl Stream for KillableReceiver { +impl> Stream for KillableReceiver { type Item = Result; fn poll_next( diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index ce9680173311ecb42dfef999c6fef7dee09e606f..dcfcd2530c31f4e1a3959d79c8c131ed68b3beb0 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -353,12 +353,14 @@ mod tests { let client1 = Peer::new(); let client2 = Peer::new(); - let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); + let (client1_to_server_conn, server_to_client_1_conn, _) = + Connection::in_memory(cx.background()); let (client1_conn_id, io_task1, client1_incoming) = client1.add_connection(client1_to_server_conn).await; let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await; - let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); + let (client2_to_server_conn, server_to_client_2_conn, _) = + Connection::in_memory(cx.background()); let (client2_conn_id, io_task3, client2_incoming) = client2.add_connection(client2_to_server_conn).await; let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await; @@ -410,9 +412,7 @@ mod tests { .unwrap(), proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(0)) }), } ); @@ -431,10 +431,8 @@ mod tests { .unwrap(), proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), + variant: Some(proto::buffer::Variant::Id(1)) + }) } ); @@ -460,9 +458,7 @@ mod tests { assert_eq!(message.worktree_id, 1); proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(0)), }), } } @@ -470,9 +466,7 @@ mod tests { assert_eq!(message.worktree_id, 2); proto::OpenBufferResponse { buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() + variant: Some(proto::buffer::Variant::Id(1)), }), } } @@ -497,7 +491,8 @@ mod tests { let server = Peer::new(); let client = Peer::new(); - let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory(); + let (client_to_server_conn, server_to_client_conn, _) = + Connection::in_memory(cx.background()); let (client_to_server_conn_id, io_task1, mut client_incoming) = client.add_connection(client_to_server_conn).await; let (server_to_client_conn_id, io_task2, mut server_incoming) = @@ -597,7 +592,7 @@ mod tests { async fn test_disconnect(cx: TestAppContext) { let executor = cx.foreground(); - let (client_conn, mut server_conn, _) = Connection::in_memory(); + let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background()); let client = Peer::new(); let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; @@ -631,7 +626,7 @@ mod tests { #[gpui::test(iterations = 10)] async fn test_io_error(cx: TestAppContext) { let executor = cx.foreground(); - let (client_conn, mut server_conn, _) = Connection::in_memory(); + let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background()); let client = Peer::new(); let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index cebe3504e9b3ce99d554163fbee026b4e9f50a14..509cefd46c912d6310b727c5e3654ed7d1c712fc 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -134,6 +134,8 @@ messages!( GetChannelMessagesResponse, GetChannels, GetChannelsResponse, + GetDefinition, + GetDefinitionResponse, GetUsers, GetUsersResponse, JoinChannel, @@ -168,6 +170,7 @@ request_messages!( (FormatBuffer, Ack), (GetChannelMessages, GetChannelMessagesResponse), (GetChannels, GetChannelsResponse), + (GetDefinition, GetDefinitionResponse), (GetUsers, GetUsersResponse), (JoinChannel, JoinChannelResponse), (JoinProject, JoinProjectResponse), @@ -191,6 +194,7 @@ entity_messages!( DiskBasedDiagnosticsUpdated, DiskBasedDiagnosticsUpdating, FormatBuffer, + GetDefinition, JoinProject, LeaveProject, OpenBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index c8af5ce1fd934bc10cf7f378317ca2a407e91f93..8d5adfb3c5ab9b23a402c2dbdcc760b68b66fb29 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -17,7 +17,7 @@ use rpc::{ Connection, ConnectionId, Peer, TypedEnvelope, }; use sha1::{Digest as _, Sha1}; -use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant}; +use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant}; use store::{Store, Worktree}; use surf::StatusCode; use tide::log; @@ -74,6 +74,7 @@ impl Server { .add_handler(Server::update_diagnostic_summary) .add_handler(Server::disk_based_diagnostics_updating) .add_handler(Server::disk_based_diagnostics_updated) + .add_handler(Server::get_definition) .add_handler(Server::open_buffer) .add_handler(Server::close_buffer) .add_handler(Server::update_buffer) @@ -479,26 +480,40 @@ impl Server { .worktree .as_mut() .ok_or_else(|| anyhow!("missing worktree"))?; - let entries = mem::take(&mut worktree.entries) - .into_iter() - .map(|entry| (entry.id, entry)) + let entries = worktree + .entries + .iter() + .map(|entry| (entry.id, entry.clone())) .collect(); - - let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries) - .into_iter() - .map(|summary| (PathBuf::from(summary.path.clone()), summary)) + let diagnostic_summaries = worktree + .diagnostic_summaries + .iter() + .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone())) .collect(); - let contact_user_ids = self.state_mut().share_worktree( + let shared_worktree = self.state_mut().share_worktree( request.payload.project_id, worktree.id, request.sender_id, entries, diagnostic_summaries, ); - if let Some(contact_user_ids) = contact_user_ids { + if let Some(shared_worktree) = shared_worktree { + broadcast( + request.sender_id, + shared_worktree.connection_ids, + |connection_id| { + self.peer.forward_send( + request.sender_id, + connection_id, + request.payload.clone(), + ) + }, + ) + .await?; self.peer.respond(request.receipt(), proto::Ack {}).await?; - self.update_contacts_for_users(&contact_user_ids).await?; + self.update_contacts_for_users(&shared_worktree.authorized_user_ids) + .await?; } else { self.peer .respond_with_error( @@ -594,6 +609,24 @@ impl Server { Ok(()) } + async fn get_definition( + self: Arc, + request: TypedEnvelope, + ) -> tide::Result<()> { + let receipt = request.receipt(); + let host_connection_id = self + .state() + .read_project(request.payload.project_id, request.sender_id) + .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))? + .host_connection_id; + let response = self + .peer + .forward_request(request.sender_id, host_connection_id, request.payload) + .await?; + self.peer.respond(receipt, response).await?; + Ok(()) + } + async fn open_buffer( self: Arc, request: TypedEnvelope, @@ -1135,6 +1168,7 @@ mod tests { use gpui::{executor, ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, watch}; + use rand::prelude::*; use rpc::PeerId; use serde_json::json; use sqlx::types::time::OffsetDateTime; @@ -1156,8 +1190,8 @@ mod tests { editor::{Editor, EditorSettings, Input, MultiBuffer}, fs::{FakeFs, Fs as _}, language::{ - tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, - LanguageRegistry, LanguageServerConfig, Point, + tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, + LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, }, lsp, project::{DiagnosticSummary, Project, ProjectPath}, @@ -1167,7 +1201,7 @@ mod tests { async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { let (window_b, _) = cx_b.add_window(|_| EmptyView); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 2 clients. @@ -1305,7 +1339,7 @@ mod tests { #[gpui::test] async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 2 clients. @@ -1406,7 +1440,7 @@ mod tests { mut cx_c: TestAppContext, ) { let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); cx_a.foreground().forbid_parking(); // Connect to a server as 3 clients. @@ -1589,7 +1623,7 @@ mod tests { async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1682,7 +1716,7 @@ mod tests { async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1760,14 +1794,14 @@ mod tests { }); } - #[gpui::test] + #[gpui::test(iterations = 100)] async fn test_editing_while_guest_opens_buffer( mut cx_a: TestAppContext, mut cx_b: TestAppContext, ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1847,7 +1881,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1922,7 +1956,7 @@ mod tests { async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 2 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -1996,7 +2030,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Set up a fake language server. let (language_server_config, mut fake_language_server) = @@ -2217,7 +2251,7 @@ mod tests { async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Set up a fake language server. let (language_server_config, mut fake_language_server) = @@ -2285,7 +2319,6 @@ mod tests { .await .unwrap(); - // Open the file to be formatted on client B. let buffer_b = cx_b .background() .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) @@ -2318,6 +2351,277 @@ mod tests { ); } + #[gpui::test] + async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { + cx_a.foreground().forbid_parking(); + let mut lang_registry = Arc::new(LanguageRegistry::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); + fs.insert_tree( + "/root-1", + json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, + "a.rs": "const ONE: usize = b::TWO + b::THREE;", + }), + ) + .await; + fs.insert_tree( + "/root-2", + json!({ + "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;", + }), + ) + .await; + + // Set up a fake language server. + let (language_server_config, mut fake_language_server) = + LanguageServerConfig::fake(cx_a.background()).await; + Arc::get_mut(&mut lang_registry) + .unwrap() + .add(Arc::new(Language::new( + LanguageConfig { + name: "Rust".to_string(), + path_suffixes: vec!["rs".to_string()], + language_server: Some(language_server_config), + ..Default::default() + }, + Some(tree_sitter_rust::language()), + ))); + + // Connect to a server as 2 clients. + let mut server = TestServer::start(cx_a.foreground()).await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + + // Share a project as client A + let project_a = cx_a.update(|cx| { + Project::local( + client_a.clone(), + client_a.user_store.clone(), + lang_registry.clone(), + fs.clone(), + cx, + ) + }); + let (worktree_a, _) = project_a + .update(&mut cx_a, |p, cx| { + p.find_or_create_local_worktree("/root-1", false, cx) + }) + .await + .unwrap(); + worktree_a + .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) + .await; + let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await; + let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id()); + project_a + .update(&mut cx_a, |p, cx| p.share(cx)) + .await + .unwrap(); + + // Join the worktree as client B. + let project_b = Project::remote( + project_id, + client_b.clone(), + client_b.user_store.clone(), + lang_registry.clone(), + fs.clone(), + &mut cx_b.to_async(), + ) + .await + .unwrap(); + + // Open the file to be formatted on client B. + let buffer_b = cx_b + .background() + .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) + .await + .unwrap(); + + let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx)); + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root-2/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), + ))), + ) + .await; + let definitions_1 = definitions_1.await.unwrap(); + cx_b.read(|cx| { + assert_eq!(definitions_1.len(), 1); + assert_eq!(project_b.read(cx).worktrees(cx).count(), 2); + let target_buffer = definitions_1[0].target_buffer.read(cx); + assert_eq!( + target_buffer.text(), + "const TWO: usize = 2;\nconst THREE: usize = 3;" + ); + assert_eq!( + definitions_1[0].target_range.to_point(target_buffer), + Point::new(0, 6)..Point::new(0, 9) + ); + }); + + // Try getting more definitions for the same buffer, ensuring the buffer gets reused from + // the previous call to `definition`. + let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx)); + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root-2/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)), + ))), + ) + .await; + let definitions_2 = definitions_2.await.unwrap(); + cx_b.read(|cx| { + assert_eq!(definitions_2.len(), 1); + assert_eq!(project_b.read(cx).worktrees(cx).count(), 2); + let target_buffer = definitions_2[0].target_buffer.read(cx); + assert_eq!( + target_buffer.text(), + "const TWO: usize = 2;\nconst THREE: usize = 3;" + ); + assert_eq!( + definitions_2[0].target_range.to_point(target_buffer), + Point::new(1, 6)..Point::new(1, 11) + ); + }); + assert_eq!( + definitions_1[0].target_buffer, + definitions_2[0].target_buffer + ); + + cx_b.update(|_| { + drop(definitions_1); + drop(definitions_2); + }); + project_b + .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1) + .await; + } + + #[gpui::test] + async fn test_open_buffer_while_getting_definition_pointing_to_it( + mut cx_a: TestAppContext, + mut cx_b: TestAppContext, + mut rng: StdRng, + ) { + cx_a.foreground().forbid_parking(); + let mut lang_registry = Arc::new(LanguageRegistry::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); + fs.insert_tree( + "/root", + json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, + "a.rs": "const ONE: usize = b::TWO;", + "b.rs": "const TWO: usize = 2", + }), + ) + .await; + + // Set up a fake language server. + let (language_server_config, mut fake_language_server) = + LanguageServerConfig::fake(cx_a.background()).await; + Arc::get_mut(&mut lang_registry) + .unwrap() + .add(Arc::new(Language::new( + LanguageConfig { + name: "Rust".to_string(), + path_suffixes: vec!["rs".to_string()], + language_server: Some(language_server_config), + ..Default::default() + }, + Some(tree_sitter_rust::language()), + ))); + + // Connect to a server as 2 clients. + let mut server = TestServer::start(cx_a.foreground()).await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + + // Share a project as client A + let project_a = cx_a.update(|cx| { + Project::local( + client_a.clone(), + client_a.user_store.clone(), + lang_registry.clone(), + fs.clone(), + cx, + ) + }); + let (worktree_a, _) = project_a + .update(&mut cx_a, |p, cx| { + p.find_or_create_local_worktree("/root", false, cx) + }) + .await + .unwrap(); + worktree_a + .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) + .await; + let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await; + let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id()); + project_a + .update(&mut cx_a, |p, cx| p.share(cx)) + .await + .unwrap(); + + // Join the worktree as client B. + let project_b = Project::remote( + project_id, + client_b.clone(), + client_b.user_store.clone(), + lang_registry.clone(), + fs.clone(), + &mut cx_b.to_async(), + ) + .await + .unwrap(); + + let buffer_b1 = cx_b + .background() + .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))) + .await + .unwrap(); + + let definitions; + let buffer_b2; + if rng.gen() { + definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + buffer_b2 = + project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)); + } else { + buffer_b2 = + project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx)); + definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx)); + } + + let (request_id, _) = fake_language_server + .receive_request::() + .await; + fake_language_server + .respond( + request_id, + Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new( + lsp::Url::from_file_path("/root/b.rs").unwrap(), + lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)), + ))), + ) + .await; + + let buffer_b2 = buffer_b2.await.unwrap(); + let definitions = definitions.await.unwrap(); + assert_eq!(definitions.len(), 1); + assert_eq!(definitions[0].target_buffer, buffer_b2); + } + #[gpui::test] async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); @@ -2738,7 +3042,7 @@ mod tests { ) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); - let fs = Arc::new(FakeFs::new()); + let fs = Arc::new(FakeFs::new(cx_a.background())); // Connect to a server as 3 clients. let mut server = TestServer::start(cx_a.foreground()).await; @@ -2938,7 +3242,8 @@ mod tests { "server is forbidding connections" ))) } else { - let (client_conn, server_conn, kill_conn) = Connection::in_memory(); + let (client_conn, server_conn, kill_conn) = + Connection::in_memory(cx.background()); connection_killers.lock().insert(user_id, kill_conn); cx.background() .spawn(server.handle_connection( diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index b7aec2689bd5b575ee335775f881356a8fa09207..6e11f431aca72c2ee44c255bdf36103cf2d1d744 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -74,6 +74,11 @@ pub struct LeftProject { pub authorized_user_ids: Vec, } +pub struct SharedWorktree { + pub authorized_user_ids: Vec, + pub connection_ids: Vec, +} + impl Store { pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { self.connections.insert( @@ -393,7 +398,7 @@ impl Store { connection_id: ConnectionId, entries: HashMap, diagnostic_summaries: BTreeMap, - ) -> Option> { + ) -> Option { let project = self.projects.get_mut(&project_id)?; let worktree = project.worktrees.get_mut(&worktree_id)?; if project.host_connection_id == connection_id && project.share.is_some() { @@ -401,7 +406,10 @@ impl Store { entries, diagnostic_summaries, }); - Some(project.authorized_user_ids()) + Some(SharedWorktree { + authorized_user_ids: project.authorized_user_ids(), + connection_ids: project.guest_connection_ids(), + }) } else { None } diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 8dc98c852bd17bca7bb1f2839bf2b83e0b93c47d..86f399a86a7dfed62a524a88a8af172f461fde5d 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -490,7 +490,7 @@ pub struct WorkspaceParams { impl WorkspaceParams { #[cfg(any(test, feature = "test-support"))] pub fn test(cx: &mut MutableAppContext) -> Self { - let fs = Arc::new(project::FakeFs::new()); + let fs = Arc::new(project::FakeFs::new(cx.background().clone())); let languages = Arc::new(LanguageRegistry::new()); let http_client = client::test::FakeHttpClient::new(|_| async move { Ok(client::http::ServerResponse::new(404)) diff --git a/crates/zed/src/test.rs b/crates/zed/src/test.rs index 4f685415d087f078e1d3f9ef3c9d969b3df31439..a365fdc6f46c7632f302c8a6fdbe25919c0edc1e 100644 --- a/crates/zed/src/test.rs +++ b/crates/zed/src/test.rs @@ -40,7 +40,7 @@ pub fn test_app_state(cx: &mut MutableAppContext) -> Arc { channel_list: cx.add_model(|cx| ChannelList::new(user_store.clone(), client.clone(), cx)), client, user_store, - fs: Arc::new(FakeFs::new()), + fs: Arc::new(FakeFs::new(cx.background().clone())), path_openers: Arc::from(path_openers), build_window_options: &build_window_options, build_workspace: &build_workspace,