Cargo.lock 🔗
@@ -2128,6 +2128,7 @@ dependencies = [
"block",
"cc",
"cocoa",
+ "collections",
"core-foundation",
"core-graphics",
"core-text",
Max Brunsfeld created
Make go-to-definition work for guests
Cargo.lock | 1
crates/client/src/test.rs | 2
crates/gpui/Cargo.toml | 4
crates/gpui/src/executor.rs | 306 ++++++------------
crates/gpui/src/test.rs | 15
crates/gpui_macros/src/gpui_macros.rs | 8
crates/language/src/buffer.rs | 6
crates/language/src/proto.rs | 6
crates/project/src/fs.rs | 11
crates/project/src/project.rs | 468 +++++++++++++++++++---------
crates/project/src/worktree.rs | 172 ++++------
crates/rpc/Cargo.toml | 3
crates/rpc/proto/zed.proto | 93 +++--
crates/rpc/src/conn.rs | 40 +
crates/rpc/src/peer.rs | 31 -
crates/rpc/src/proto.rs | 4
crates/server/src/rpc.rs | 359 ++++++++++++++++++++-
crates/server/src/rpc/store.rs | 12
crates/workspace/src/workspace.rs | 2
crates/zed/src/test.rs | 2
20 files changed, 962 insertions(+), 583 deletions(-)
@@ -2128,6 +2128,7 @@ dependencies = [
"block",
"cc",
"cocoa",
+ "collections",
"core-foundation",
"core-graphics",
"core-text",
@@ -94,7 +94,7 @@ impl FakeServer {
Err(EstablishConnectionError::Unauthorized)?
}
- let (client_conn, server_conn, _) = Connection::in_memory();
+ let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
let (connection_id, io, incoming) = self.peer.add_connection(server_conn).await;
cx.background().spawn(io).detach();
*self.incoming.lock() = Some(incoming);
@@ -8,9 +8,10 @@ version = "0.1.0"
path = "src/gpui.rs"
[features]
-test-support = ["env_logger"]
+test-support = ["env_logger", "collections/test-support"]
[dependencies]
+collections = { path = "../collections" }
gpui_macros = { path = "../gpui_macros" }
sum_tree = { path = "../sum_tree" }
async-task = "4.0.3"
@@ -47,6 +48,7 @@ bindgen = "0.58.1"
cc = "1.0.67"
[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
env_logger = "0.8"
png = "0.16"
simplelog = "0.9"
@@ -1,10 +1,11 @@
use anyhow::{anyhow, Result};
use async_task::Runnable;
use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
+use collections::HashMap;
use parking_lot::Mutex;
use postage::{barrier, prelude::Stream as _};
use rand::prelude::*;
-use smol::{channel, prelude::*, Executor, Timer};
+use smol::{channel, future::yield_now, prelude::*, Executor, Timer};
use std::{
any::Any,
fmt::{self, Debug, Display},
@@ -33,8 +34,10 @@ pub enum Foreground {
dispatcher: Arc<dyn platform::Dispatcher>,
_not_send_or_sync: PhantomData<Rc<()>>,
},
- Test(smol::LocalExecutor<'static>),
- Deterministic(Arc<Deterministic>),
+ Deterministic {
+ cx_id: usize,
+ executor: Arc<Deterministic>,
+ },
}
pub enum Background {
@@ -70,9 +73,8 @@ unsafe impl<T: Send> Send for Task<T> {}
struct DeterministicState {
rng: StdRng,
seed: u64,
- scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
- scheduled_from_background: Vec<(Runnable, Backtrace)>,
- spawned_from_foreground: Vec<(Runnable, Backtrace)>,
+ scheduled_from_foreground: HashMap<usize, Vec<ForegroundRunnable>>,
+ scheduled_from_background: Vec<Runnable>,
forbid_parking: bool,
block_on_ticks: RangeInclusive<usize>,
now: Instant,
@@ -80,20 +82,24 @@ struct DeterministicState {
waiting_backtrace: Option<Backtrace>,
}
+struct ForegroundRunnable {
+ runnable: Runnable,
+ main: bool,
+}
+
pub struct Deterministic {
state: Arc<Mutex<DeterministicState>>,
parker: Mutex<parking::Parker>,
}
impl Deterministic {
- fn new(seed: u64) -> Self {
- Self {
+ pub fn new(seed: u64) -> Arc<Self> {
+ Arc::new(Self {
state: Arc::new(Mutex::new(DeterministicState {
rng: StdRng::seed_from_u64(seed),
seed,
scheduled_from_foreground: Default::default(),
scheduled_from_background: Default::default(),
- spawned_from_foreground: Default::default(),
forbid_parking: false,
block_on_ticks: 0..=1000,
now: Instant::now(),
@@ -101,22 +107,37 @@ impl Deterministic {
waiting_backtrace: None,
})),
parker: Default::default(),
- }
+ })
+ }
+
+ pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
+ Arc::new(Background::Deterministic {
+ executor: self.clone(),
+ })
}
- fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
- let backtrace = Backtrace::new_unresolved();
- let scheduled_once = AtomicBool::new(false);
+ pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
+ Rc::new(Foreground::Deterministic {
+ cx_id: id,
+ executor: self.clone(),
+ })
+ }
+
+ fn spawn_from_foreground(
+ &self,
+ cx_id: usize,
+ future: AnyLocalFuture,
+ main: bool,
+ ) -> AnyLocalTask {
let state = self.state.clone();
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
let mut state = state.lock();
- let backtrace = backtrace.clone();
- if scheduled_once.fetch_or(true, SeqCst) {
- state.scheduled_from_foreground.push((runnable, backtrace));
- } else {
- state.spawned_from_foreground.push((runnable, backtrace));
- }
+ state
+ .scheduled_from_foreground
+ .entry(cx_id)
+ .or_default()
+ .push(ForegroundRunnable { runnable, main });
unparker.unpark();
});
runnable.schedule();
@@ -124,24 +145,23 @@ impl Deterministic {
}
fn spawn(&self, future: AnyFuture) -> AnyTask {
- let backtrace = Backtrace::new_unresolved();
let state = self.state.clone();
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn(future, move |runnable| {
let mut state = state.lock();
- state
- .scheduled_from_background
- .push((runnable, backtrace.clone()));
+ state.scheduled_from_background.push(runnable);
unparker.unpark();
});
runnable.schedule();
task
}
- fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
+ fn run(&self, cx_id: usize, main_future: AnyLocalFuture) -> Box<dyn Any> {
let woken = Arc::new(AtomicBool::new(false));
+ let mut main_task = self.spawn_from_foreground(cx_id, main_future, true);
+
loop {
- if let Some(result) = self.run_internal(woken.clone(), &mut future) {
+ if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
return result;
}
@@ -156,14 +176,13 @@ impl Deterministic {
fn run_until_parked(&self) {
let woken = Arc::new(AtomicBool::new(false));
- let mut future = any_local_future(std::future::pending::<()>());
- self.run_internal(woken, &mut future);
+ self.run_internal(woken, None);
}
fn run_internal(
&self,
woken: Arc<AtomicBool>,
- future: &mut AnyLocalFuture,
+ mut main_task: Option<&mut AnyLocalTask>,
) -> Option<Box<dyn Any>> {
let unparker = self.parker.lock().unparker();
let waker = waker_fn(move || {
@@ -172,48 +191,46 @@ impl Deterministic {
});
let mut cx = Context::from_waker(&waker);
- let mut trace = Trace::default();
loop {
let mut state = self.state.lock();
- let runnable_count = state.scheduled_from_foreground.len()
- + state.scheduled_from_background.len()
- + state.spawned_from_foreground.len();
- let ix = state.rng.gen_range(0..=runnable_count);
- if ix < state.scheduled_from_foreground.len() {
- let (_, backtrace) = &state.scheduled_from_foreground[ix];
- trace.record(&state, backtrace.clone());
- let runnable = state.scheduled_from_foreground.remove(ix).0;
- drop(state);
- runnable.run();
- } else if ix - state.scheduled_from_foreground.len()
- < state.scheduled_from_background.len()
+ if state.scheduled_from_foreground.is_empty()
+ && state.scheduled_from_background.is_empty()
{
- let ix = ix - state.scheduled_from_foreground.len();
- let (_, backtrace) = &state.scheduled_from_background[ix];
- trace.record(&state, backtrace.clone());
- let runnable = state.scheduled_from_background.remove(ix).0;
- drop(state);
- runnable.run();
- } else if ix < runnable_count {
- let (_, backtrace) = &state.spawned_from_foreground[0];
- trace.record(&state, backtrace.clone());
- let runnable = state.spawned_from_foreground.remove(0).0;
+ return None;
+ }
+
+ if !state.scheduled_from_background.is_empty() && state.rng.gen() {
+ let background_len = state.scheduled_from_background.len();
+ let ix = state.rng.gen_range(0..background_len);
+ let runnable = state.scheduled_from_background.remove(ix);
drop(state);
runnable.run();
- } else {
- drop(state);
- if let Poll::Ready(result) = future.poll(&mut cx) {
- return Some(result);
+ } else if !state.scheduled_from_foreground.is_empty() {
+ let available_cx_ids = state
+ .scheduled_from_foreground
+ .keys()
+ .copied()
+ .collect::<Vec<_>>();
+ let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
+ let scheduled_from_cx = state
+ .scheduled_from_foreground
+ .get_mut(&cx_id_to_run)
+ .unwrap();
+ let foreground_runnable = scheduled_from_cx.remove(0);
+ if scheduled_from_cx.is_empty() {
+ state.scheduled_from_foreground.remove(&cx_id_to_run);
}
- let state = self.state.lock();
+ drop(state);
- if state.scheduled_from_foreground.is_empty()
- && state.scheduled_from_background.is_empty()
- && state.spawned_from_foreground.is_empty()
- {
- return None;
+ foreground_runnable.runnable.run();
+ if let Some(main_task) = main_task.as_mut() {
+ if foreground_runnable.main {
+ if let Poll::Ready(result) = main_task.poll(&mut cx) {
+ return Some(result);
+ }
+ }
}
}
}
@@ -231,15 +248,12 @@ impl Deterministic {
};
let mut cx = Context::from_waker(&waker);
- let mut trace = Trace::default();
for _ in 0..max_ticks {
let mut state = self.state.lock();
let runnable_count = state.scheduled_from_background.len();
let ix = state.rng.gen_range(0..=runnable_count);
if ix < state.scheduled_from_background.len() {
- let (_, backtrace) = &state.scheduled_from_background[ix];
- trace.record(&state, backtrace.clone());
- let runnable = state.scheduled_from_background.remove(ix).0;
+ let runnable = state.scheduled_from_background.remove(ix);
drop(state);
runnable.run();
} else {
@@ -282,69 +296,13 @@ impl DeterministicState {
}
}
-#[derive(Default)]
-struct Trace {
- executed: Vec<Backtrace>,
- scheduled: Vec<Vec<Backtrace>>,
- spawned_from_foreground: Vec<Vec<Backtrace>>,
-}
-
-impl Trace {
- fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
- self.scheduled.push(
- state
- .scheduled_from_foreground
- .iter()
- .map(|(_, backtrace)| backtrace.clone())
- .collect(),
- );
- self.spawned_from_foreground.push(
- state
- .spawned_from_foreground
- .iter()
- .map(|(_, backtrace)| backtrace.clone())
- .collect(),
- );
- self.executed.push(executed);
- }
-
- fn resolve(&mut self) {
- for backtrace in &mut self.executed {
- backtrace.resolve();
- }
-
- for backtraces in &mut self.scheduled {
- for backtrace in backtraces {
- backtrace.resolve();
- }
- }
-
- for backtraces in &mut self.spawned_from_foreground {
- for backtrace in backtraces {
- backtrace.resolve();
- }
- }
- }
-}
-
struct CwdBacktrace<'a> {
backtrace: &'a Backtrace,
- first_frame_only: bool,
}
impl<'a> CwdBacktrace<'a> {
fn new(backtrace: &'a Backtrace) -> Self {
- Self {
- backtrace,
- first_frame_only: false,
- }
- }
-
- fn first_frame(backtrace: &'a Backtrace) -> Self {
- Self {
- backtrace,
- first_frame_only: true,
- }
+ Self { backtrace }
}
}
@@ -363,69 +321,12 @@ impl<'a> Debug for CwdBacktrace<'a> {
.any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
{
formatted_frame.backtrace_frame(frame)?;
- if self.first_frame_only {
- break;
- }
}
}
fmt.finish()
}
}
-impl Debug for Trace {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- for ((backtrace, scheduled), spawned_from_foreground) in self
- .executed
- .iter()
- .zip(&self.scheduled)
- .zip(&self.spawned_from_foreground)
- {
- writeln!(f, "Scheduled")?;
- for backtrace in scheduled {
- writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
- }
- if scheduled.is_empty() {
- writeln!(f, "None")?;
- }
- writeln!(f, "==========")?;
-
- writeln!(f, "Spawned from foreground")?;
- for backtrace in spawned_from_foreground {
- writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
- }
- if spawned_from_foreground.is_empty() {
- writeln!(f, "None")?;
- }
- writeln!(f, "==========")?;
-
- writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?;
- writeln!(f, "+++++++++++++++++++")?;
- }
-
- Ok(())
- }
-}
-
-impl Drop for Trace {
- fn drop(&mut self) {
- let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
- trace_on_panic == "1" || trace_on_panic == "true"
- } else {
- false
- };
- let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
- trace_always == "1" || trace_always == "true"
- } else {
- false
- };
-
- if trace_always || (trace_on_panic && thread::panicking()) {
- self.resolve();
- dbg!(self);
- }
- }
-}
-
impl Foreground {
pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
if dispatcher.is_main_thread() {
@@ -438,14 +339,12 @@ impl Foreground {
}
}
- pub fn test() -> Self {
- Self::Test(smol::LocalExecutor::new())
- }
-
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let future = any_local_future(future);
let any_task = match self {
- Self::Deterministic(executor) => executor.spawn_from_foreground(future),
+ Self::Deterministic { cx_id, executor } => {
+ executor.spawn_from_foreground(*cx_id, future, false)
+ }
Self::Platform { dispatcher, .. } => {
fn spawn_inner(
future: AnyLocalFuture,
@@ -460,7 +359,6 @@ impl Foreground {
}
spawn_inner(future, dispatcher)
}
- Self::Test(executor) => executor.spawn(future),
};
Task::local(any_task)
}
@@ -468,23 +366,22 @@ impl Foreground {
pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
let future = any_local_future(future);
let any_value = match self {
- Self::Deterministic(executor) => executor.run(future),
+ Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
- Self::Test(executor) => smol::block_on(executor.run(future)),
};
*any_value.downcast().unwrap()
}
pub fn parking_forbidden(&self) -> bool {
match self {
- Self::Deterministic(executor) => executor.state.lock().forbid_parking,
+ Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
_ => panic!("this method can only be called on a deterministic executor"),
}
}
pub fn start_waiting(&self) {
match self {
- Self::Deterministic(executor) => {
+ Self::Deterministic { executor, .. } => {
executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
}
_ => panic!("this method can only be called on a deterministic executor"),
@@ -493,7 +390,7 @@ impl Foreground {
pub fn finish_waiting(&self) {
match self {
- Self::Deterministic(executor) => {
+ Self::Deterministic { executor, .. } => {
executor.state.lock().waiting_backtrace.take();
}
_ => panic!("this method can only be called on a deterministic executor"),
@@ -502,7 +399,7 @@ impl Foreground {
pub fn forbid_parking(&self) {
match self {
- Self::Deterministic(executor) => {
+ Self::Deterministic { executor, .. } => {
let mut state = executor.state.lock();
state.forbid_parking = true;
state.rng = StdRng::seed_from_u64(state.seed);
@@ -513,7 +410,7 @@ impl Foreground {
pub async fn timer(&self, duration: Duration) {
match self {
- Self::Deterministic(executor) => {
+ Self::Deterministic { executor, .. } => {
let (tx, mut rx) = barrier::channel();
{
let mut state = executor.state.lock();
@@ -530,7 +427,7 @@ impl Foreground {
pub fn advance_clock(&self, duration: Duration) {
match self {
- Self::Deterministic(executor) => {
+ Self::Deterministic { executor, .. } => {
executor.run_until_parked();
let mut state = executor.state.lock();
@@ -548,7 +445,7 @@ impl Foreground {
pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
match self {
- Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
+ Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
_ => panic!("this method can only be called on a deterministic executor"),
}
}
@@ -586,7 +483,7 @@ impl Background {
let future = any_future(future);
let any_task = match self {
Self::Production { executor, .. } => executor.spawn(future),
- Self::Deterministic { executor, .. } => executor.spawn(future),
+ Self::Deterministic { executor } => executor.spawn(future),
};
Task::send(any_task)
}
@@ -631,6 +528,17 @@ impl Background {
task.await;
}
}
+
+ pub async fn simulate_random_delay(&self) {
+ match self {
+ Self::Deterministic { executor, .. } => {
+ if executor.state.lock().rng.gen_range(0..100) < 20 {
+ yield_now().await;
+ }
+ }
+ _ => panic!("this method can only be called on a deterministic executor"),
+ }
+ }
}
pub struct Scope<'a> {
@@ -653,14 +561,6 @@ impl<'a> Scope<'a> {
}
}
-pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
- let executor = Arc::new(Deterministic::new(seed));
- (
- Rc::new(Foreground::Deterministic(executor.clone())),
- Arc::new(Background::Deterministic { executor }),
- )
-}
-
impl<T> Task<T> {
pub fn ready(value: T) -> Self {
Self::Ready(Some(value))
@@ -28,7 +28,12 @@ pub fn run_test(
mut starting_seed: u64,
max_retries: usize,
test_fn: &mut (dyn RefUnwindSafe
- + Fn(&mut MutableAppContext, Rc<platform::test::ForegroundPlatform>, u64)),
+ + Fn(
+ &mut MutableAppContext,
+ Rc<platform::test::ForegroundPlatform>,
+ Arc<executor::Deterministic>,
+ u64,
+ )),
) {
let is_randomized = num_iterations > 1;
if is_randomized {
@@ -60,16 +65,16 @@ pub fn run_test(
dbg!(seed);
}
- let (foreground, background) = executor::deterministic(seed);
+ let deterministic = executor::Deterministic::new(seed);
let mut cx = TestAppContext::new(
foreground_platform.clone(),
platform.clone(),
- foreground.clone(),
- background.clone(),
+ deterministic.build_foreground(usize::MAX),
+ deterministic.build_background(),
font_cache.clone(),
0,
);
- cx.update(|cx| test_fn(cx, foreground_platform.clone(), seed));
+ cx.update(|cx| test_fn(cx, foreground_platform.clone(), deterministic, seed));
atomic_seed.fetch_add(1, SeqCst);
}
@@ -77,8 +77,8 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
#namespace::TestAppContext::new(
foreground_platform.clone(),
cx.platform().clone(),
- cx.foreground().clone(),
- cx.background().clone(),
+ deterministic.build_foreground(#ix),
+ deterministic.build_background(),
cx.font_cache().clone(),
#first_entity_id,
),
@@ -115,7 +115,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
#num_iterations as u64,
#starting_seed as u64,
#max_retries,
- &mut |cx, foreground_platform, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args))
+ &mut |cx, foreground_platform, deterministic, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args))
);
}
}
@@ -147,7 +147,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
#num_iterations as u64,
#starting_seed as u64,
#max_retries,
- &mut |cx, _, seed| #inner_fn_name(#inner_fn_args)
+ &mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args)
);
}
}
@@ -305,7 +305,7 @@ impl Buffer {
pub fn from_proto(
replica_id: ReplicaId,
- message: proto::Buffer,
+ message: proto::BufferState,
file: Option<Box<dyn File>>,
cx: &mut ModelContext<Self>,
) -> Result<Self> {
@@ -359,8 +359,8 @@ impl Buffer {
Ok(this)
}
- pub fn to_proto(&self) -> proto::Buffer {
- proto::Buffer {
+ pub fn to_proto(&self) -> proto::BufferState {
+ proto::BufferState {
id: self.remote_id(),
file: self.file.as_ref().map(|f| f.to_proto()),
visible_text: self.text.text(),
@@ -7,7 +7,7 @@ use rpc::proto;
use std::sync::Arc;
use text::*;
-pub use proto::{Buffer, SelectionSet};
+pub use proto::{Buffer, BufferState, SelectionSet};
pub fn serialize_operation(operation: &Operation) -> proto::Operation {
proto::Operation {
@@ -155,7 +155,7 @@ pub fn serialize_diagnostics<'a>(
.collect()
}
-fn serialize_anchor(anchor: &Anchor) -> proto::Anchor {
+pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor {
proto::Anchor {
replica_id: anchor.timestamp.replica_id as u32,
local_timestamp: anchor.timestamp.value,
@@ -352,7 +352,7 @@ pub fn deserialize_diagnostics(
.collect()
}
-fn deserialize_anchor(anchor: proto::Anchor) -> Option<Anchor> {
+pub fn deserialize_anchor(anchor: proto::Anchor) -> Option<Anchor> {
Some(Anchor {
timestamp: clock::Local {
replica_id: anchor.replica_id as ReplicaId,
@@ -181,11 +181,12 @@ impl FakeFsState {
pub struct FakeFs {
// Use an unfair lock to ensure tests are deterministic.
state: futures::lock::Mutex<FakeFsState>,
+ executor: std::sync::Arc<gpui::executor::Background>,
}
#[cfg(any(test, feature = "test-support"))]
impl FakeFs {
- pub fn new() -> Self {
+ pub fn new(executor: std::sync::Arc<gpui::executor::Background>) -> Self {
let (events_tx, _) = postage::broadcast::channel(2048);
let mut entries = std::collections::BTreeMap::new();
entries.insert(
@@ -201,6 +202,7 @@ impl FakeFs {
},
);
Self {
+ executor,
state: futures::lock::Mutex::new(FakeFsState {
entries,
next_inode: 1,
@@ -330,6 +332,7 @@ impl FakeFs {
#[async_trait::async_trait]
impl Fs for FakeFs {
async fn load(&self, path: &Path) -> Result<String> {
+ self.executor.simulate_random_delay().await;
let state = self.state.lock().await;
let text = state
.entries
@@ -340,6 +343,7 @@ impl Fs for FakeFs {
}
async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
+ self.executor.simulate_random_delay().await;
let mut state = self.state.lock().await;
state.validate_path(path)?;
if let Some(entry) = state.entries.get_mut(path) {
@@ -370,10 +374,12 @@ impl Fs for FakeFs {
}
async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
+ self.executor.simulate_random_delay().await;
Ok(path.to_path_buf())
}
async fn is_file(&self, path: &Path) -> bool {
+ self.executor.simulate_random_delay().await;
let state = self.state.lock().await;
state
.entries
@@ -382,6 +388,7 @@ impl Fs for FakeFs {
}
async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
+ self.executor.simulate_random_delay().await;
let state = self.state.lock().await;
Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
}
@@ -391,6 +398,7 @@ impl Fs for FakeFs {
abs_path: &Path,
) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
use futures::{future, stream};
+ self.executor.simulate_random_delay().await;
let state = self.state.lock().await;
let abs_path = abs_path.to_path_buf();
Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
@@ -410,6 +418,7 @@ impl Fs for FakeFs {
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
let state = self.state.lock().await;
+ self.executor.simulate_random_delay().await;
let rx = state.events_tx.subscribe();
let path = path.to_path_buf();
Box::pin(futures::StreamExt::filter(rx, move |events| {
@@ -13,8 +13,9 @@ use gpui::{
WeakModelHandle,
};
use language::{
+ proto::{deserialize_anchor, serialize_anchor},
range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language,
- LanguageRegistry, Operation, PointUtf16, ToOffset, ToPointUtf16,
+ LanguageRegistry, PointUtf16, ToOffset, ToPointUtf16,
};
use lsp::{DiagnosticSeverity, LanguageServer};
use postage::{prelude::Stream, watch};
@@ -42,7 +43,7 @@ pub struct Project {
collaborators: HashMap<PeerId, Collaborator>,
subscriptions: Vec<client::Subscription>,
language_servers_with_diagnostics_running: isize,
- open_buffers: HashMap<usize, OpenBuffer>,
+ open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
loading_buffers: HashMap<
ProjectPath,
postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
@@ -50,11 +51,6 @@ pub struct Project {
shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
}
-enum OpenBuffer {
- Operations(Vec<Operation>),
- Loaded(WeakModelHandle<Buffer>),
-}
-
enum WorktreeHandle {
Strong(ModelHandle<Worktree>),
Weak(WeakModelHandle<Worktree>),
@@ -246,8 +242,10 @@ impl Project {
let mut worktrees = Vec::new();
for worktree in response.worktrees {
- worktrees
- .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?);
+ let (worktree, load_task) = cx
+ .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx));
+ worktrees.push(worktree);
+ load_task.detach();
}
let user_ids = response
@@ -336,6 +334,7 @@ impl Project {
client.subscribe_to_entity(remote_id, cx, Self::handle_save_buffer),
client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
client.subscribe_to_entity(remote_id, cx, Self::handle_format_buffer),
+ client.subscribe_to_entity(remote_id, cx, Self::handle_get_definition),
]);
}
}
@@ -458,6 +457,7 @@ impl Project {
rpc.send(proto::UnshareProject { project_id }).await?;
this.update(&mut cx, |this, cx| {
this.collaborators.clear();
+ this.shared_buffers.clear();
for worktree in this.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, _| {
worktree.as_local_mut().unwrap().unshare();
@@ -514,21 +514,18 @@ impl Project {
let (mut tx, rx) = postage::watch::channel();
entry.insert(rx.clone());
- let load_buffer = worktree.update(cx, |worktree, cx| {
- worktree.load_buffer(&project_path.path, cx)
- });
+ let load_buffer = if worktree.read(cx).is_local() {
+ self.open_local_buffer(&project_path.path, &worktree, cx)
+ } else {
+ self.open_remote_buffer(&project_path.path, &worktree, cx)
+ };
cx.spawn(move |this, mut cx| async move {
let load_result = load_buffer.await;
- *tx.borrow_mut() = Some(this.update(&mut cx, |this, cx| {
+ *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
// Record the fact that the buffer is no longer loading.
this.loading_buffers.remove(&project_path);
let buffer = load_result.map_err(Arc::new)?;
- this.open_buffers.insert(
- buffer.read(cx).remote_id() as usize,
- OpenBuffer::Loaded(buffer.downgrade()),
- );
- this.assign_language_to_buffer(&worktree, &buffer, cx);
Ok(buffer)
}));
})
@@ -550,6 +547,55 @@ impl Project {
})
}
+ fn open_local_buffer(
+ &mut self,
+ path: &Arc<Path>,
+ worktree: &ModelHandle<Worktree>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<ModelHandle<Buffer>>> {
+ let load_buffer = worktree.update(cx, |worktree, cx| {
+ let worktree = worktree.as_local_mut().unwrap();
+ worktree.load_buffer(path, cx)
+ });
+ let worktree = worktree.downgrade();
+ cx.spawn(|this, mut cx| async move {
+ let buffer = load_buffer.await?;
+ let worktree = worktree
+ .upgrade(&cx)
+ .ok_or_else(|| anyhow!("worktree was removed"))?;
+ this.update(&mut cx, |this, cx| {
+ this.register_buffer(&buffer, Some(&worktree), cx)
+ })?;
+ Ok(buffer)
+ })
+ }
+
+ fn open_remote_buffer(
+ &mut self,
+ path: &Arc<Path>,
+ worktree: &ModelHandle<Worktree>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<ModelHandle<Buffer>>> {
+ let rpc = self.client.clone();
+ let project_id = self.remote_id().unwrap();
+ let remote_worktree_id = worktree.read(cx).id();
+ let path = path.clone();
+ let path_string = path.to_string_lossy().to_string();
+ cx.spawn(|this, mut cx| async move {
+ let response = rpc
+ .request(proto::OpenBuffer {
+ project_id,
+ worktree_id: remote_worktree_id.to_proto(),
+ path: path_string,
+ })
+ .await?;
+ let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
+ this.update(&mut cx, |this, cx| {
+ this.deserialize_remote_buffer(buffer, cx)
+ })
+ })
+ }
+
pub fn save_buffer_as(
&self,
buffer: ModelHandle<Buffer>,
@@ -568,9 +614,7 @@ impl Project {
})
.await?;
this.update(&mut cx, |this, cx| {
- this.open_buffers
- .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade()));
- this.assign_language_to_buffer(&worktree, &buffer, cx);
+ this.assign_language_to_buffer(&buffer, Some(&worktree), cx);
});
Ok(())
})
@@ -603,25 +647,41 @@ impl Project {
let mut result = None;
let worktree = self.worktree_for_id(path.worktree_id, cx)?;
self.open_buffers.retain(|_, buffer| {
- if let OpenBuffer::Loaded(buffer) = buffer {
- if let Some(buffer) = buffer.upgrade(cx) {
- if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
- if file.worktree == worktree && file.path() == &path.path {
- result = Some(buffer);
- }
+ if let Some(buffer) = buffer.upgrade(cx) {
+ if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+ if file.worktree == worktree && file.path() == &path.path {
+ result = Some(buffer);
}
- return true;
}
+ true
+ } else {
+ false
}
- false
});
result
}
+ fn register_buffer(
+ &mut self,
+ buffer: &ModelHandle<Buffer>,
+ worktree: Option<&ModelHandle<Worktree>>,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<()> {
+ if self
+ .open_buffers
+ .insert(buffer.read(cx).remote_id() as usize, buffer.downgrade())
+ .is_some()
+ {
+ return Err(anyhow!("registered the same buffer twice"));
+ }
+ self.assign_language_to_buffer(&buffer, worktree, cx);
+ Ok(())
+ }
+
fn assign_language_to_buffer(
&mut self,
- worktree: &ModelHandle<Worktree>,
buffer: &ModelHandle<Buffer>,
+ worktree: Option<&ModelHandle<Worktree>>,
cx: &mut ModelContext<Self>,
) -> Option<()> {
let (path, full_path) = {
@@ -637,7 +697,7 @@ impl Project {
// For local worktrees, start a language server if needed.
// Also assign the language server and any previously stored diagnostics to the buffer.
- if let Some(local_worktree) = worktree.read(cx).as_local() {
+ if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) {
let worktree_id = local_worktree.id();
let worktree_abs_path = local_worktree.abs_path().clone();
@@ -661,7 +721,7 @@ impl Project {
}
}
- if let Some(local_worktree) = worktree.read(cx).as_local() {
+ if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) {
if let Some(diagnostics) = local_worktree.diagnostics_for_path(&path) {
buffer.update(cx, |buffer, cx| {
buffer.update_diagnostics(None, diagnostics, cx).log_err();
@@ -935,10 +995,10 @@ impl Project {
cx: &mut ModelContext<Self>,
) -> Task<Result<Vec<Definition>>> {
let source_buffer_handle = source_buffer_handle.clone();
- let buffer = source_buffer_handle.read(cx);
+ let source_buffer = source_buffer_handle.read(cx);
let worktree;
let buffer_abs_path;
- if let Some(file) = File::from_dyn(buffer.file()) {
+ if let Some(file) = File::from_dyn(source_buffer.file()) {
worktree = file.worktree.clone();
buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
} else {
@@ -946,11 +1006,11 @@ impl Project {
};
if worktree.read(cx).as_local().is_some() {
- let point = buffer.offset_to_point_utf16(position.to_offset(buffer));
+ let point = source_buffer.offset_to_point_utf16(position.to_offset(source_buffer));
let buffer_abs_path = buffer_abs_path.unwrap();
let lang_name;
let lang_server;
- if let Some(lang) = buffer.language() {
+ if let Some(lang) = source_buffer.language() {
lang_name = lang.name().to_string();
if let Some(server) = self
.language_servers
@@ -1045,9 +1105,41 @@ impl Project {
Ok(definitions)
})
+ } else if let Some(project_id) = self.remote_id() {
+ let client = self.client.clone();
+ let request = proto::GetDefinition {
+ project_id,
+ buffer_id: source_buffer.remote_id(),
+ position: Some(serialize_anchor(&source_buffer.anchor_before(position))),
+ };
+ cx.spawn(|this, mut cx| async move {
+ let response = client.request(request).await?;
+ this.update(&mut cx, |this, cx| {
+ let mut definitions = Vec::new();
+ for definition in response.definitions {
+ let target_buffer = this.deserialize_remote_buffer(
+ definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?,
+ cx,
+ )?;
+ let target_start = definition
+ .target_start
+ .and_then(deserialize_anchor)
+ .ok_or_else(|| anyhow!("missing target start"))?;
+ let target_end = definition
+ .target_end
+ .and_then(deserialize_anchor)
+ .ok_or_else(|| anyhow!("missing target end"))?;
+ definitions.push(Definition {
+ target_buffer,
+ target_range: target_start..target_end,
+ })
+ }
+
+ Ok(definitions)
+ })
+ })
} else {
- log::info!("go to definition is not yet implemented for guests");
- Task::ready(Ok(Default::default()))
+ Task::ready(Err(anyhow!("project does not have a remote id")))
}
}
@@ -1173,62 +1265,60 @@ impl Project {
let snapshot = worktree_handle.read(cx).snapshot();
let mut buffers_to_delete = Vec::new();
for (buffer_id, buffer) in &self.open_buffers {
- if let OpenBuffer::Loaded(buffer) = buffer {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| {
- if let Some(old_file) = File::from_dyn(buffer.file()) {
- if old_file.worktree != worktree_handle {
- return;
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| {
+ if let Some(old_file) = File::from_dyn(buffer.file()) {
+ if old_file.worktree != worktree_handle {
+ return;
+ }
+
+ let new_file = if let Some(entry) = old_file
+ .entry_id
+ .and_then(|entry_id| snapshot.entry_for_id(entry_id))
+ {
+ File {
+ is_local: true,
+ entry_id: Some(entry.id),
+ mtime: entry.mtime,
+ path: entry.path.clone(),
+ worktree: worktree_handle.clone(),
}
+ } else if let Some(entry) =
+ snapshot.entry_for_path(old_file.path().as_ref())
+ {
+ File {
+ is_local: true,
+ entry_id: Some(entry.id),
+ mtime: entry.mtime,
+ path: entry.path.clone(),
+ worktree: worktree_handle.clone(),
+ }
+ } else {
+ File {
+ is_local: true,
+ entry_id: None,
+ path: old_file.path().clone(),
+ mtime: old_file.mtime(),
+ worktree: worktree_handle.clone(),
+ }
+ };
- let new_file = if let Some(entry) = old_file
- .entry_id
- .and_then(|entry_id| snapshot.entry_for_id(entry_id))
- {
- File {
- is_local: true,
- entry_id: Some(entry.id),
- mtime: entry.mtime,
- path: entry.path.clone(),
- worktree: worktree_handle.clone(),
- }
- } else if let Some(entry) =
- snapshot.entry_for_path(old_file.path().as_ref())
- {
- File {
- is_local: true,
- entry_id: Some(entry.id),
- mtime: entry.mtime,
- path: entry.path.clone(),
- worktree: worktree_handle.clone(),
- }
- } else {
- File {
- is_local: true,
- entry_id: None,
- path: old_file.path().clone(),
- mtime: old_file.mtime(),
- worktree: worktree_handle.clone(),
- }
+ if let Some(project_id) = self.remote_id() {
+ let client = self.client.clone();
+ let message = proto::UpdateBufferFile {
+ project_id,
+ buffer_id: *buffer_id as u64,
+ file: Some(new_file.to_proto()),
};
-
- if let Some(project_id) = self.remote_id() {
- let client = self.client.clone();
- let message = proto::UpdateBufferFile {
- project_id,
- buffer_id: *buffer_id as u64,
- file: Some(new_file.to_proto()),
- };
- cx.foreground()
- .spawn(async move { client.send(message).await })
- .detach_and_log_err(cx);
- }
- buffer.file_updated(Box::new(new_file), cx).detach();
+ cx.foreground()
+ .spawn(async move { client.send(message).await })
+ .detach_and_log_err(cx);
}
- });
- } else {
- buffers_to_delete.push(*buffer_id);
- }
+ buffer.file_updated(Box::new(new_file), cx).detach();
+ }
+ });
+ } else {
+ buffers_to_delete.push(*buffer_id);
}
}
@@ -1366,10 +1456,8 @@ impl Project {
.replica_id;
self.shared_buffers.remove(&peer_id);
for (_, buffer) in &self.open_buffers {
- if let OpenBuffer::Loaded(buffer) = buffer {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
- }
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
}
}
cx.notify();
@@ -1388,16 +1476,9 @@ impl Project {
.payload
.worktree
.ok_or_else(|| anyhow!("invalid worktree"))?;
- cx.spawn(|this, mut cx| {
- async move {
- let worktree =
- Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?;
- this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx));
- Ok(())
- }
- .log_err()
- })
- .detach();
+ let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx);
+ self.add_worktree(&worktree, cx);
+ load_task.detach();
Ok(())
}
@@ -1486,19 +1567,9 @@ impl Project {
.into_iter()
.map(|op| language::proto::deserialize_operation(op))
.collect::<Result<Vec<_>, _>>()?;
- match self.open_buffers.get_mut(&buffer_id) {
- Some(OpenBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
- Some(OpenBuffer::Loaded(buffer)) => {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
- } else {
- self.open_buffers
- .insert(buffer_id, OpenBuffer::Operations(ops));
- }
- }
- None => {
- self.open_buffers
- .insert(buffer_id, OpenBuffer::Operations(ops));
+ if let Some(buffer) = self.open_buffers.get_mut(&buffer_id) {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
}
}
Ok(())
@@ -1611,6 +1682,53 @@ impl Project {
Ok(())
}
+ pub fn handle_get_definition(
+ &mut self,
+ envelope: TypedEnvelope<proto::GetDefinition>,
+ rpc: Arc<Client>,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<()> {
+ let receipt = envelope.receipt();
+ let sender_id = envelope.original_sender_id()?;
+ let source_buffer = self
+ .shared_buffers
+ .get(&sender_id)
+ .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+ .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+ let position = envelope
+ .payload
+ .position
+ .and_then(deserialize_anchor)
+ .ok_or_else(|| anyhow!("invalid position"))?;
+ if !source_buffer.read(cx).can_resolve(&position) {
+ return Err(anyhow!("cannot resolve position"));
+ }
+
+ let definitions = self.definition(&source_buffer, position, cx);
+ cx.spawn(|this, mut cx| async move {
+ let definitions = definitions.await?;
+ let mut response = proto::GetDefinitionResponse {
+ definitions: Default::default(),
+ };
+ this.update(&mut cx, |this, cx| {
+ for definition in definitions {
+ let buffer =
+ this.serialize_buffer_for_peer(&definition.target_buffer, sender_id, cx);
+ response.definitions.push(proto::Definition {
+ target_start: Some(serialize_anchor(&definition.target_range.start)),
+ target_end: Some(serialize_anchor(&definition.target_range.end)),
+ buffer: Some(buffer),
+ });
+ }
+ });
+ rpc.respond(receipt, response).await?;
+ Ok::<_, anyhow::Error>(())
+ })
+ .detach_and_log_err(cx);
+
+ Ok(())
+ }
+
pub fn handle_open_buffer(
&mut self,
envelope: TypedEnvelope<proto::OpenBuffer>,
@@ -1630,17 +1748,13 @@ impl Project {
cx.spawn(|this, mut cx| {
async move {
let buffer = open_buffer.await?;
- this.update(&mut cx, |this, _| {
- this.shared_buffers
- .entry(peer_id)
- .or_default()
- .insert(buffer.id() as u64, buffer.clone());
+ let buffer = this.update(&mut cx, |this, cx| {
+ this.serialize_buffer_for_peer(&buffer, peer_id, cx)
});
- let message = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
rpc.respond(
receipt,
proto::OpenBufferResponse {
- buffer: Some(message),
+ buffer: Some(buffer),
},
)
.await
@@ -1651,6 +1765,60 @@ impl Project {
Ok(())
}
+ fn serialize_buffer_for_peer(
+ &mut self,
+ buffer: &ModelHandle<Buffer>,
+ peer_id: PeerId,
+ cx: &AppContext,
+ ) -> proto::Buffer {
+ let buffer_id = buffer.read(cx).remote_id();
+ let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
+ match shared_buffers.entry(buffer_id) {
+ hash_map::Entry::Occupied(_) => proto::Buffer {
+ variant: Some(proto::buffer::Variant::Id(buffer_id)),
+ },
+ hash_map::Entry::Vacant(entry) => {
+ entry.insert(buffer.clone());
+ proto::Buffer {
+ variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())),
+ }
+ }
+ }
+ }
+
+ fn deserialize_remote_buffer(
+ &mut self,
+ buffer: proto::Buffer,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<ModelHandle<Buffer>> {
+ match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? {
+ proto::buffer::Variant::Id(id) => self
+ .open_buffers
+ .get(&(id as usize))
+ .and_then(|buffer| buffer.upgrade(cx))
+ .ok_or_else(|| anyhow!("no buffer exists for id {}", id)),
+ proto::buffer::Variant::State(mut buffer) => {
+ let mut buffer_worktree = None;
+ let mut buffer_file = None;
+ if let Some(file) = buffer.file.take() {
+ let worktree_id = WorktreeId::from_proto(file.worktree_id);
+ let worktree = self
+ .worktree_for_id(worktree_id, cx)
+ .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?;
+ buffer_file = Some(Box::new(File::from_proto(file, worktree.clone(), cx)?)
+ as Box<dyn language::File>);
+ buffer_worktree = Some(worktree);
+ }
+
+ let buffer = cx.add_model(|cx| {
+ Buffer::from_proto(self.replica_id(), buffer, buffer_file, cx).unwrap()
+ });
+ self.register_buffer(&buffer, buffer_worktree.as_ref(), cx)?;
+ Ok(buffer)
+ }
+ }
+ }
+
pub fn handle_close_buffer(
&mut self,
envelope: TypedEnvelope<proto::CloseBuffer>,
@@ -1674,13 +1842,7 @@ impl Project {
let buffer = self
.open_buffers
.get(&(payload.buffer_id as usize))
- .and_then(|buf| {
- if let OpenBuffer::Loaded(buffer) = buf {
- buffer.upgrade(cx)
- } else {
- None
- }
- });
+ .and_then(|buffer| buffer.upgrade(cx));
if let Some(buffer) = buffer {
buffer.update(cx, |buffer, cx| {
let version = payload.version.try_into()?;
@@ -1705,13 +1867,7 @@ impl Project {
let buffer = self
.open_buffers
.get(&(payload.buffer_id as usize))
- .and_then(|buf| {
- if let OpenBuffer::Loaded(buffer) = buf {
- buffer.upgrade(cx)
- } else {
- None
- }
- });
+ .and_then(|buffer| buffer.upgrade(cx));
if let Some(buffer) = buffer {
buffer.update(cx, |buffer, cx| {
let version = payload.version.try_into()?;
@@ -1910,15 +2066,6 @@ impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
}
}
-impl OpenBuffer {
- fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
- match self {
- OpenBuffer::Loaded(buffer) => buffer.upgrade(cx),
- OpenBuffer::Operations(_) => None,
- }
- }
-}
-
#[cfg(test)]
mod tests {
use super::{Event, *};
@@ -2292,7 +2439,7 @@ mod tests {
#[gpui::test]
async fn test_save_file(mut cx: gpui::TestAppContext) {
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx.background()));
fs.insert_tree(
"/dir",
json!({
@@ -2330,7 +2477,7 @@ mod tests {
#[gpui::test]
async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx.background()));
fs.insert_tree(
"/dir",
json!({
@@ -2419,15 +2566,16 @@ mod tests {
// Create a remote copy of this worktree.
let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
- let remote = Worktree::remote(
- 1,
- 1,
- initial_snapshot.to_proto(&Default::default(), Default::default()),
- rpc.clone(),
- &mut cx.to_async(),
- )
- .await
- .unwrap();
+ let (remote, load_task) = cx.update(|cx| {
+ Worktree::remote(
+ 1,
+ 1,
+ initial_snapshot.to_proto(&Default::default(), Default::default()),
+ rpc.clone(),
+ cx,
+ )
+ });
+ load_task.await;
cx.read(|cx| {
assert!(!buffer2.read(cx).is_dirty());
@@ -2516,7 +2664,7 @@ mod tests {
#[gpui::test]
async fn test_buffer_deduping(mut cx: gpui::TestAppContext) {
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx.background()));
fs.insert_tree(
"/the-dir",
json!({
@@ -2805,7 +2953,7 @@ mod tests {
#[gpui::test]
async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) {
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx.background()));
fs.insert_tree(
"/the-dir",
json!({
@@ -190,13 +190,13 @@ impl Worktree {
Ok(tree)
}
- pub async fn remote(
+ pub fn remote(
project_remote_id: u64,
replica_id: ReplicaId,
worktree: proto::Worktree,
client: Arc<Client>,
- cx: &mut AsyncAppContext,
- ) -> Result<ModelHandle<Self>> {
+ cx: &mut MutableAppContext,
+ ) -> (ModelHandle<Self>, Task<()>) {
let remote_id = worktree.id;
let root_char_bag: CharBag = worktree
.root_name
@@ -205,32 +205,26 @@ impl Worktree {
.collect();
let root_name = worktree.root_name.clone();
let weak = worktree.weak;
- let (entries_by_path, entries_by_id, diagnostic_summaries) = cx
- .background()
- .spawn(async move {
- let mut entries_by_path_edits = Vec::new();
- let mut entries_by_id_edits = Vec::new();
- for entry in worktree.entries {
- match Entry::try_from((&root_char_bag, entry)) {
- Ok(entry) => {
- entries_by_id_edits.push(Edit::Insert(PathEntry {
- id: entry.id,
- path: entry.path.clone(),
- is_ignored: entry.is_ignored,
- scan_id: 0,
- }));
- entries_by_path_edits.push(Edit::Insert(entry));
- }
- Err(err) => log::warn!("error for remote worktree entry {:?}", err),
- }
- }
-
- let mut entries_by_path = SumTree::new();
- let mut entries_by_id = SumTree::new();
- entries_by_path.edit(entries_by_path_edits, &());
- entries_by_id.edit(entries_by_id_edits, &());
+ let snapshot = Snapshot {
+ id: WorktreeId(remote_id as usize),
+ root_name,
+ root_char_bag,
+ entries_by_path: Default::default(),
+ entries_by_id: Default::default(),
+ };
- let diagnostic_summaries = TreeMap::from_ordered_entries(
+ let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
+ let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
+ let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
+ Worktree::Remote(RemoteWorktree {
+ project_id: project_remote_id,
+ replica_id,
+ snapshot: snapshot.clone(),
+ snapshot_rx: snapshot_rx.clone(),
+ updates_tx,
+ client: client.clone(),
+ queued_operations: Default::default(),
+ diagnostic_summaries: TreeMap::from_ordered_entries(
worktree.diagnostic_summaries.into_iter().map(|summary| {
(
PathKey(PathBuf::from(summary.path).into()),
@@ -242,24 +236,48 @@ impl Worktree {
},
)
}),
- );
-
- (entries_by_path, entries_by_id, diagnostic_summaries)
+ ),
+ weak,
})
- .await;
+ });
- let worktree = cx.update(|cx| {
- cx.add_model(|cx: &mut ModelContext<Worktree>| {
- let snapshot = Snapshot {
- id: WorktreeId(remote_id as usize),
- root_name,
- root_char_bag,
- entries_by_path,
- entries_by_id,
- };
+ let deserialize_task = cx.spawn({
+ let worktree_handle = worktree_handle.clone();
+ |cx| async move {
+ let (entries_by_path, entries_by_id) = cx
+ .background()
+ .spawn(async move {
+ let mut entries_by_path_edits = Vec::new();
+ let mut entries_by_id_edits = Vec::new();
+ for entry in worktree.entries {
+ match Entry::try_from((&root_char_bag, entry)) {
+ Ok(entry) => {
+ entries_by_id_edits.push(Edit::Insert(PathEntry {
+ id: entry.id,
+ path: entry.path.clone(),
+ is_ignored: entry.is_ignored,
+ scan_id: 0,
+ }));
+ entries_by_path_edits.push(Edit::Insert(entry));
+ }
+ Err(err) => log::warn!("error for remote worktree entry {:?}", err),
+ }
+ }
+
+ let mut entries_by_path = SumTree::new();
+ let mut entries_by_id = SumTree::new();
+ entries_by_path.edit(entries_by_path_edits, &());
+ entries_by_id.edit(entries_by_id_edits, &());
+
+ (entries_by_path, entries_by_id)
+ })
+ .await;
- let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
- let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
+ {
+ let mut snapshot = snapshot_tx.borrow_mut();
+ snapshot.entries_by_path = entries_by_path;
+ snapshot.entries_by_id = entries_by_id;
+ }
cx.background()
.spawn(async move {
@@ -275,7 +293,8 @@ impl Worktree {
{
let mut snapshot_rx = snapshot_rx.clone();
- cx.spawn_weak(|this, mut cx| async move {
+ let this = worktree_handle.downgrade();
+ cx.spawn(|mut cx| async move {
while let Some(_) = snapshot_rx.recv().await {
if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
@@ -286,22 +305,9 @@ impl Worktree {
})
.detach();
}
-
- Worktree::Remote(RemoteWorktree {
- project_id: project_remote_id,
- replica_id,
- snapshot,
- snapshot_rx,
- updates_tx,
- client: client.clone(),
- queued_operations: Default::default(),
- diagnostic_summaries,
- weak,
- })
- })
+ }
});
-
- Ok(worktree)
+ (worktree_handle, deserialize_task)
}
pub fn as_local(&self) -> Option<&LocalWorktree> {
@@ -361,17 +367,6 @@ impl Worktree {
}
}
- pub fn load_buffer(
- &mut self,
- path: &Path,
- cx: &mut ModelContext<Self>,
- ) -> Task<Result<ModelHandle<Buffer>>> {
- match self {
- Worktree::Local(worktree) => worktree.load_buffer(path, cx),
- Worktree::Remote(worktree) => worktree.load_buffer(path, cx),
- }
- }
-
pub fn diagnostic_summaries<'a>(
&'a self,
) -> impl Iterator<Item = (Arc<Path>, DiagnosticSummary)> + 'a {
@@ -828,41 +823,6 @@ impl LocalWorktree {
}
impl RemoteWorktree {
- pub(crate) fn load_buffer(
- &mut self,
- path: &Path,
- cx: &mut ModelContext<Worktree>,
- ) -> Task<Result<ModelHandle<Buffer>>> {
- let rpc = self.client.clone();
- let replica_id = self.replica_id;
- let project_id = self.project_id;
- let remote_worktree_id = self.id();
- let path: Arc<Path> = Arc::from(path);
- let path_string = path.to_string_lossy().to_string();
- cx.spawn_weak(move |this, mut cx| async move {
- let response = rpc
- .request(proto::OpenBuffer {
- project_id,
- worktree_id: remote_worktree_id.to_proto(),
- path: path_string,
- })
- .await?;
-
- let this = this
- .upgrade(&cx)
- .ok_or_else(|| anyhow!("worktree was closed"))?;
- let mut remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
- let file = remote_buffer
- .file
- .take()
- .map(|proto| cx.read(|cx| File::from_proto(proto, this.clone(), cx)))
- .transpose()?
- .map(|file| Box::new(file) as Box<dyn language::File>);
-
- Ok(cx.add_model(|cx| Buffer::from_proto(replica_id, remote_buffer, file, cx).unwrap()))
- })
- }
-
fn snapshot(&self) -> Snapshot {
self.snapshot.clone()
}
@@ -2472,7 +2432,7 @@ mod tests {
#[gpui::test]
async fn test_traversal(cx: gpui::TestAppContext) {
- let fs = FakeFs::new();
+ let fs = FakeFs::new(cx.background());
fs.insert_tree(
"/root",
json!({
@@ -8,7 +8,7 @@ version = "0.1.0"
path = "src/rpc.rs"
[features]
-test-support = []
+test-support = ["gpui"]
[dependencies]
anyhow = "1.0"
@@ -25,6 +25,7 @@ rsa = "0.4"
serde = { version = "1", features = ["derive"] }
smol-timeout = "0.6"
zstd = "0.9"
+gpui = { path = "../gpui", features = ["test-support"], optional = true }
[build-dependencies]
prost-build = "0.8"
@@ -20,40 +20,42 @@ message Envelope {
LeaveProject leave_project = 14;
AddProjectCollaborator add_project_collaborator = 15;
RemoveProjectCollaborator remove_project_collaborator = 16;
-
- RegisterWorktree register_worktree = 17;
- UnregisterWorktree unregister_worktree = 18;
- ShareWorktree share_worktree = 19;
- UpdateWorktree update_worktree = 20;
- UpdateDiagnosticSummary update_diagnostic_summary = 21;
- DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 22;
- DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 23;
-
- OpenBuffer open_buffer = 24;
- OpenBufferResponse open_buffer_response = 25;
- CloseBuffer close_buffer = 26;
- UpdateBuffer update_buffer = 27;
- UpdateBufferFile update_buffer_file = 28;
- SaveBuffer save_buffer = 29;
- BufferSaved buffer_saved = 30;
- BufferReloaded buffer_reloaded = 31;
- FormatBuffer format_buffer = 32;
-
- GetChannels get_channels = 33;
- GetChannelsResponse get_channels_response = 34;
- JoinChannel join_channel = 35;
- JoinChannelResponse join_channel_response = 36;
- LeaveChannel leave_channel = 37;
- SendChannelMessage send_channel_message = 38;
- SendChannelMessageResponse send_channel_message_response = 39;
- ChannelMessageSent channel_message_sent = 40;
- GetChannelMessages get_channel_messages = 41;
- GetChannelMessagesResponse get_channel_messages_response = 42;
-
- UpdateContacts update_contacts = 43;
-
- GetUsers get_users = 44;
- GetUsersResponse get_users_response = 45;
+ GetDefinition get_definition = 17;
+ GetDefinitionResponse get_definition_response = 18;
+
+ RegisterWorktree register_worktree = 19;
+ UnregisterWorktree unregister_worktree = 20;
+ ShareWorktree share_worktree = 21;
+ UpdateWorktree update_worktree = 22;
+ UpdateDiagnosticSummary update_diagnostic_summary = 23;
+ DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 24;
+ DiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 25;
+
+ OpenBuffer open_buffer = 26;
+ OpenBufferResponse open_buffer_response = 27;
+ CloseBuffer close_buffer = 28;
+ UpdateBuffer update_buffer = 29;
+ UpdateBufferFile update_buffer_file = 30;
+ SaveBuffer save_buffer = 31;
+ BufferSaved buffer_saved = 32;
+ BufferReloaded buffer_reloaded = 33;
+ FormatBuffer format_buffer = 34;
+
+ GetChannels get_channels = 35;
+ GetChannelsResponse get_channels_response = 36;
+ JoinChannel join_channel = 37;
+ JoinChannelResponse join_channel_response = 38;
+ LeaveChannel leave_channel = 39;
+ SendChannelMessage send_channel_message = 40;
+ SendChannelMessageResponse send_channel_message_response = 41;
+ ChannelMessageSent channel_message_sent = 42;
+ GetChannelMessages get_channel_messages = 43;
+ GetChannelMessagesResponse get_channel_messages_response = 44;
+
+ UpdateContacts update_contacts = 45;
+
+ GetUsers get_users = 46;
+ GetUsersResponse get_users_response = 47;
}
}
@@ -134,6 +136,22 @@ message RemoveProjectCollaborator {
uint32 peer_id = 2;
}
+message GetDefinition {
+ uint64 project_id = 1;
+ uint64 buffer_id = 2;
+ Anchor position = 3;
+ }
+
+message GetDefinitionResponse {
+ repeated Definition definitions = 1;
+}
+
+message Definition {
+ Buffer buffer = 1;
+ Anchor target_start = 2;
+ Anchor target_end = 3;
+}
+
message OpenBuffer {
uint64 project_id = 1;
uint64 worktree_id = 2;
@@ -303,6 +321,13 @@ message Entry {
}
message Buffer {
+ oneof variant {
+ uint64 id = 1;
+ BufferState state = 2;
+ }
+}
+
+message BufferState {
uint64 id = 1;
optional File file = 2;
string visible_text = 3;
@@ -34,12 +34,14 @@ impl Connection {
}
#[cfg(any(test, feature = "test-support"))]
- pub fn in_memory() -> (Self, Self, postage::watch::Sender<Option<()>>) {
+ pub fn in_memory(
+ executor: std::sync::Arc<gpui::executor::Background>,
+ ) -> (Self, Self, postage::watch::Sender<Option<()>>) {
let (kill_tx, mut kill_rx) = postage::watch::channel_with(None);
postage::stream::Stream::try_recv(&mut kill_rx).unwrap();
- let (a_tx, a_rx) = Self::channel(kill_rx.clone());
- let (b_tx, b_rx) = Self::channel(kill_rx);
+ let (a_tx, a_rx) = Self::channel(kill_rx.clone(), executor.clone());
+ let (b_tx, b_rx) = Self::channel(kill_rx, executor);
(
Self { tx: a_tx, rx: b_rx },
Self { tx: b_tx, rx: a_rx },
@@ -50,11 +52,12 @@ impl Connection {
#[cfg(any(test, feature = "test-support"))]
fn channel(
kill_rx: postage::watch::Receiver<Option<()>>,
+ executor: std::sync::Arc<gpui::executor::Background>,
) -> (
Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
Box<dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>>,
) {
- use futures::{future, SinkExt as _};
+ use futures::SinkExt as _;
use io::{Error, ErrorKind};
let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
@@ -62,26 +65,39 @@ impl Connection {
.sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
.with({
let kill_rx = kill_rx.clone();
+ let executor = executor.clone();
move |msg| {
- if kill_rx.borrow().is_none() {
- future::ready(Ok(msg))
- } else {
- future::ready(Err(Error::new(ErrorKind::Other, "connection killed").into()))
- }
+ let kill_rx = kill_rx.clone();
+ let executor = executor.clone();
+ Box::pin(async move {
+ executor.simulate_random_delay().await;
+ if kill_rx.borrow().is_none() {
+ Ok(msg)
+ } else {
+ Err(Error::new(ErrorKind::Other, "connection killed").into())
+ }
+ })
}
});
+ let rx = rx.then(move |msg| {
+ let executor = executor.clone();
+ Box::pin(async move {
+ executor.simulate_random_delay().await;
+ msg
+ })
+ });
let rx = KillableReceiver { kill_rx, rx };
(Box::new(tx), Box::new(rx))
}
}
-struct KillableReceiver {
- rx: mpsc::UnboundedReceiver<WebSocketMessage>,
+struct KillableReceiver<S> {
+ rx: S,
kill_rx: postage::watch::Receiver<Option<()>>,
}
-impl Stream for KillableReceiver {
+impl<S: Unpin + Stream<Item = WebSocketMessage>> Stream for KillableReceiver<S> {
type Item = Result<WebSocketMessage, WebSocketError>;
fn poll_next(
@@ -353,12 +353,14 @@ mod tests {
let client1 = Peer::new();
let client2 = Peer::new();
- let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory();
+ let (client1_to_server_conn, server_to_client_1_conn, _) =
+ Connection::in_memory(cx.background());
let (client1_conn_id, io_task1, client1_incoming) =
client1.add_connection(client1_to_server_conn).await;
let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await;
- let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory();
+ let (client2_to_server_conn, server_to_client_2_conn, _) =
+ Connection::in_memory(cx.background());
let (client2_conn_id, io_task3, client2_incoming) =
client2.add_connection(client2_to_server_conn).await;
let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await;
@@ -410,9 +412,7 @@ mod tests {
.unwrap(),
proto::OpenBufferResponse {
buffer: Some(proto::Buffer {
- id: 101,
- visible_text: "path/one content".to_string(),
- ..Default::default()
+ variant: Some(proto::buffer::Variant::Id(0))
}),
}
);
@@ -431,10 +431,8 @@ mod tests {
.unwrap(),
proto::OpenBufferResponse {
buffer: Some(proto::Buffer {
- id: 102,
- visible_text: "path/two content".to_string(),
- ..Default::default()
- }),
+ variant: Some(proto::buffer::Variant::Id(1))
+ })
}
);
@@ -460,9 +458,7 @@ mod tests {
assert_eq!(message.worktree_id, 1);
proto::OpenBufferResponse {
buffer: Some(proto::Buffer {
- id: 101,
- visible_text: "path/one content".to_string(),
- ..Default::default()
+ variant: Some(proto::buffer::Variant::Id(0)),
}),
}
}
@@ -470,9 +466,7 @@ mod tests {
assert_eq!(message.worktree_id, 2);
proto::OpenBufferResponse {
buffer: Some(proto::Buffer {
- id: 102,
- visible_text: "path/two content".to_string(),
- ..Default::default()
+ variant: Some(proto::buffer::Variant::Id(1)),
}),
}
}
@@ -497,7 +491,8 @@ mod tests {
let server = Peer::new();
let client = Peer::new();
- let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory();
+ let (client_to_server_conn, server_to_client_conn, _) =
+ Connection::in_memory(cx.background());
let (client_to_server_conn_id, io_task1, mut client_incoming) =
client.add_connection(client_to_server_conn).await;
let (server_to_client_conn_id, io_task2, mut server_incoming) =
@@ -597,7 +592,7 @@ mod tests {
async fn test_disconnect(cx: TestAppContext) {
let executor = cx.foreground();
- let (client_conn, mut server_conn, _) = Connection::in_memory();
+ let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background());
let client = Peer::new();
let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await;
@@ -631,7 +626,7 @@ mod tests {
#[gpui::test(iterations = 10)]
async fn test_io_error(cx: TestAppContext) {
let executor = cx.foreground();
- let (client_conn, mut server_conn, _) = Connection::in_memory();
+ let (client_conn, mut server_conn, _) = Connection::in_memory(cx.background());
let client = Peer::new();
let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await;
@@ -134,6 +134,8 @@ messages!(
GetChannelMessagesResponse,
GetChannels,
GetChannelsResponse,
+ GetDefinition,
+ GetDefinitionResponse,
GetUsers,
GetUsersResponse,
JoinChannel,
@@ -168,6 +170,7 @@ request_messages!(
(FormatBuffer, Ack),
(GetChannelMessages, GetChannelMessagesResponse),
(GetChannels, GetChannelsResponse),
+ (GetDefinition, GetDefinitionResponse),
(GetUsers, GetUsersResponse),
(JoinChannel, JoinChannelResponse),
(JoinProject, JoinProjectResponse),
@@ -191,6 +194,7 @@ entity_messages!(
DiskBasedDiagnosticsUpdated,
DiskBasedDiagnosticsUpdating,
FormatBuffer,
+ GetDefinition,
JoinProject,
LeaveProject,
OpenBuffer,
@@ -17,7 +17,7 @@ use rpc::{
Connection, ConnectionId, Peer, TypedEnvelope,
};
use sha1::{Digest as _, Sha1};
-use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
+use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
use store::{Store, Worktree};
use surf::StatusCode;
use tide::log;
@@ -74,6 +74,7 @@ impl Server {
.add_handler(Server::update_diagnostic_summary)
.add_handler(Server::disk_based_diagnostics_updating)
.add_handler(Server::disk_based_diagnostics_updated)
+ .add_handler(Server::get_definition)
.add_handler(Server::open_buffer)
.add_handler(Server::close_buffer)
.add_handler(Server::update_buffer)
@@ -479,26 +480,40 @@ impl Server {
.worktree
.as_mut()
.ok_or_else(|| anyhow!("missing worktree"))?;
- let entries = mem::take(&mut worktree.entries)
- .into_iter()
- .map(|entry| (entry.id, entry))
+ let entries = worktree
+ .entries
+ .iter()
+ .map(|entry| (entry.id, entry.clone()))
.collect();
-
- let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
- .into_iter()
- .map(|summary| (PathBuf::from(summary.path.clone()), summary))
+ let diagnostic_summaries = worktree
+ .diagnostic_summaries
+ .iter()
+ .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
.collect();
- let contact_user_ids = self.state_mut().share_worktree(
+ let shared_worktree = self.state_mut().share_worktree(
request.payload.project_id,
worktree.id,
request.sender_id,
entries,
diagnostic_summaries,
);
- if let Some(contact_user_ids) = contact_user_ids {
+ if let Some(shared_worktree) = shared_worktree {
+ broadcast(
+ request.sender_id,
+ shared_worktree.connection_ids,
+ |connection_id| {
+ self.peer.forward_send(
+ request.sender_id,
+ connection_id,
+ request.payload.clone(),
+ )
+ },
+ )
+ .await?;
self.peer.respond(request.receipt(), proto::Ack {}).await?;
- self.update_contacts_for_users(&contact_user_ids).await?;
+ self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
+ .await?;
} else {
self.peer
.respond_with_error(
@@ -594,6 +609,24 @@ impl Server {
Ok(())
}
+ async fn get_definition(
+ self: Arc<Server>,
+ request: TypedEnvelope<proto::GetDefinition>,
+ ) -> tide::Result<()> {
+ let receipt = request.receipt();
+ let host_connection_id = self
+ .state()
+ .read_project(request.payload.project_id, request.sender_id)
+ .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
+ .host_connection_id;
+ let response = self
+ .peer
+ .forward_request(request.sender_id, host_connection_id, request.payload)
+ .await?;
+ self.peer.respond(receipt, response).await?;
+ Ok(())
+ }
+
async fn open_buffer(
self: Arc<Server>,
request: TypedEnvelope<proto::OpenBuffer>,
@@ -1135,6 +1168,7 @@ mod tests {
use gpui::{executor, ModelHandle, TestAppContext};
use parking_lot::Mutex;
use postage::{mpsc, watch};
+ use rand::prelude::*;
use rpc::PeerId;
use serde_json::json;
use sqlx::types::time::OffsetDateTime;
@@ -1156,8 +1190,8 @@ mod tests {
editor::{Editor, EditorSettings, Input, MultiBuffer},
fs::{FakeFs, Fs as _},
language::{
- tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
- LanguageRegistry, LanguageServerConfig, Point,
+ tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
+ LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
},
lsp,
project::{DiagnosticSummary, Project, ProjectPath},
@@ -1167,7 +1201,7 @@ mod tests {
async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
let (window_b, _) = cx_b.add_window(|_| EmptyView);
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
@@ -1305,7 +1339,7 @@ mod tests {
#[gpui::test]
async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
@@ -1406,7 +1440,7 @@ mod tests {
mut cx_c: TestAppContext,
) {
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
cx_a.foreground().forbid_parking();
// Connect to a server as 3 clients.
@@ -1589,7 +1623,7 @@ mod tests {
async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 2 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -1682,7 +1716,7 @@ mod tests {
async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 2 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -1760,14 +1794,14 @@ mod tests {
});
}
- #[gpui::test]
+ #[gpui::test(iterations = 100)]
async fn test_editing_while_guest_opens_buffer(
mut cx_a: TestAppContext,
mut cx_b: TestAppContext,
) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 2 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -1847,7 +1881,7 @@ mod tests {
) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 2 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -1922,7 +1956,7 @@ mod tests {
async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 2 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -1996,7 +2030,7 @@ mod tests {
) {
cx_a.foreground().forbid_parking();
let mut lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Set up a fake language server.
let (language_server_config, mut fake_language_server) =
@@ -2217,7 +2251,7 @@ mod tests {
async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
cx_a.foreground().forbid_parking();
let mut lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Set up a fake language server.
let (language_server_config, mut fake_language_server) =
@@ -2285,7 +2319,6 @@ mod tests {
.await
.unwrap();
- // Open the file to be formatted on client B.
let buffer_b = cx_b
.background()
.spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
@@ -2318,6 +2351,277 @@ mod tests {
);
}
+ #[gpui::test]
+ async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
+ cx_a.foreground().forbid_parking();
+ let mut lang_registry = Arc::new(LanguageRegistry::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
+ fs.insert_tree(
+ "/root-1",
+ json!({
+ ".zed.toml": r#"collaborators = ["user_b"]"#,
+ "a.rs": "const ONE: usize = b::TWO + b::THREE;",
+ }),
+ )
+ .await;
+ fs.insert_tree(
+ "/root-2",
+ json!({
+ "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
+ }),
+ )
+ .await;
+
+ // Set up a fake language server.
+ let (language_server_config, mut fake_language_server) =
+ LanguageServerConfig::fake(cx_a.background()).await;
+ Arc::get_mut(&mut lang_registry)
+ .unwrap()
+ .add(Arc::new(Language::new(
+ LanguageConfig {
+ name: "Rust".to_string(),
+ path_suffixes: vec!["rs".to_string()],
+ language_server: Some(language_server_config),
+ ..Default::default()
+ },
+ Some(tree_sitter_rust::language()),
+ )));
+
+ // Connect to a server as 2 clients.
+ let mut server = TestServer::start(cx_a.foreground()).await;
+ let client_a = server.create_client(&mut cx_a, "user_a").await;
+ let client_b = server.create_client(&mut cx_b, "user_b").await;
+
+ // Share a project as client A
+ let project_a = cx_a.update(|cx| {
+ Project::local(
+ client_a.clone(),
+ client_a.user_store.clone(),
+ lang_registry.clone(),
+ fs.clone(),
+ cx,
+ )
+ });
+ let (worktree_a, _) = project_a
+ .update(&mut cx_a, |p, cx| {
+ p.find_or_create_local_worktree("/root-1", false, cx)
+ })
+ .await
+ .unwrap();
+ worktree_a
+ .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
+ .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
+ project_a
+ .update(&mut cx_a, |p, cx| p.share(cx))
+ .await
+ .unwrap();
+
+ // Join the worktree as client B.
+ let project_b = Project::remote(
+ project_id,
+ client_b.clone(),
+ client_b.user_store.clone(),
+ lang_registry.clone(),
+ fs.clone(),
+ &mut cx_b.to_async(),
+ )
+ .await
+ .unwrap();
+
+ // Open the file to be formatted on client B.
+ let buffer_b = cx_b
+ .background()
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
+ .await
+ .unwrap();
+
+ let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
+ let (request_id, _) = fake_language_server
+ .receive_request::<lsp::request::GotoDefinition>()
+ .await;
+ fake_language_server
+ .respond(
+ request_id,
+ Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
+ lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
+ lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+ ))),
+ )
+ .await;
+ let definitions_1 = definitions_1.await.unwrap();
+ cx_b.read(|cx| {
+ assert_eq!(definitions_1.len(), 1);
+ assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
+ let target_buffer = definitions_1[0].target_buffer.read(cx);
+ assert_eq!(
+ target_buffer.text(),
+ "const TWO: usize = 2;\nconst THREE: usize = 3;"
+ );
+ assert_eq!(
+ definitions_1[0].target_range.to_point(target_buffer),
+ Point::new(0, 6)..Point::new(0, 9)
+ );
+ });
+
+ // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
+ // the previous call to `definition`.
+ let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
+ let (request_id, _) = fake_language_server
+ .receive_request::<lsp::request::GotoDefinition>()
+ .await;
+ fake_language_server
+ .respond(
+ request_id,
+ Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
+ lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
+ lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
+ ))),
+ )
+ .await;
+ let definitions_2 = definitions_2.await.unwrap();
+ cx_b.read(|cx| {
+ assert_eq!(definitions_2.len(), 1);
+ assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
+ let target_buffer = definitions_2[0].target_buffer.read(cx);
+ assert_eq!(
+ target_buffer.text(),
+ "const TWO: usize = 2;\nconst THREE: usize = 3;"
+ );
+ assert_eq!(
+ definitions_2[0].target_range.to_point(target_buffer),
+ Point::new(1, 6)..Point::new(1, 11)
+ );
+ });
+ assert_eq!(
+ definitions_1[0].target_buffer,
+ definitions_2[0].target_buffer
+ );
+
+ cx_b.update(|_| {
+ drop(definitions_1);
+ drop(definitions_2);
+ });
+ project_b
+ .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
+ .await;
+ }
+
+ #[gpui::test]
+ async fn test_open_buffer_while_getting_definition_pointing_to_it(
+ mut cx_a: TestAppContext,
+ mut cx_b: TestAppContext,
+ mut rng: StdRng,
+ ) {
+ cx_a.foreground().forbid_parking();
+ let mut lang_registry = Arc::new(LanguageRegistry::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
+ fs.insert_tree(
+ "/root",
+ json!({
+ ".zed.toml": r#"collaborators = ["user_b"]"#,
+ "a.rs": "const ONE: usize = b::TWO;",
+ "b.rs": "const TWO: usize = 2",
+ }),
+ )
+ .await;
+
+ // Set up a fake language server.
+ let (language_server_config, mut fake_language_server) =
+ LanguageServerConfig::fake(cx_a.background()).await;
+ Arc::get_mut(&mut lang_registry)
+ .unwrap()
+ .add(Arc::new(Language::new(
+ LanguageConfig {
+ name: "Rust".to_string(),
+ path_suffixes: vec!["rs".to_string()],
+ language_server: Some(language_server_config),
+ ..Default::default()
+ },
+ Some(tree_sitter_rust::language()),
+ )));
+
+ // Connect to a server as 2 clients.
+ let mut server = TestServer::start(cx_a.foreground()).await;
+ let client_a = server.create_client(&mut cx_a, "user_a").await;
+ let client_b = server.create_client(&mut cx_b, "user_b").await;
+
+ // Share a project as client A
+ let project_a = cx_a.update(|cx| {
+ Project::local(
+ client_a.clone(),
+ client_a.user_store.clone(),
+ lang_registry.clone(),
+ fs.clone(),
+ cx,
+ )
+ });
+ let (worktree_a, _) = project_a
+ .update(&mut cx_a, |p, cx| {
+ p.find_or_create_local_worktree("/root", false, cx)
+ })
+ .await
+ .unwrap();
+ worktree_a
+ .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
+ .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
+ project_a
+ .update(&mut cx_a, |p, cx| p.share(cx))
+ .await
+ .unwrap();
+
+ // Join the worktree as client B.
+ let project_b = Project::remote(
+ project_id,
+ client_b.clone(),
+ client_b.user_store.clone(),
+ lang_registry.clone(),
+ fs.clone(),
+ &mut cx_b.to_async(),
+ )
+ .await
+ .unwrap();
+
+ let buffer_b1 = cx_b
+ .background()
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
+ .await
+ .unwrap();
+
+ let definitions;
+ let buffer_b2;
+ if rng.gen() {
+ definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
+ buffer_b2 =
+ project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
+ } else {
+ buffer_b2 =
+ project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
+ definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
+ }
+
+ let (request_id, _) = fake_language_server
+ .receive_request::<lsp::request::GotoDefinition>()
+ .await;
+ fake_language_server
+ .respond(
+ request_id,
+ Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
+ lsp::Url::from_file_path("/root/b.rs").unwrap(),
+ lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+ ))),
+ )
+ .await;
+
+ let buffer_b2 = buffer_b2.await.unwrap();
+ let definitions = definitions.await.unwrap();
+ assert_eq!(definitions.len(), 1);
+ assert_eq!(definitions[0].target_buffer, buffer_b2);
+ }
+
#[gpui::test]
async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
cx_a.foreground().forbid_parking();
@@ -2738,7 +3042,7 @@ mod tests {
) {
cx_a.foreground().forbid_parking();
let lang_registry = Arc::new(LanguageRegistry::new());
- let fs = Arc::new(FakeFs::new());
+ let fs = Arc::new(FakeFs::new(cx_a.background()));
// Connect to a server as 3 clients.
let mut server = TestServer::start(cx_a.foreground()).await;
@@ -2938,7 +3242,8 @@ mod tests {
"server is forbidding connections"
)))
} else {
- let (client_conn, server_conn, kill_conn) = Connection::in_memory();
+ let (client_conn, server_conn, kill_conn) =
+ Connection::in_memory(cx.background());
connection_killers.lock().insert(user_id, kill_conn);
cx.background()
.spawn(server.handle_connection(
@@ -74,6 +74,11 @@ pub struct LeftProject {
pub authorized_user_ids: Vec<UserId>,
}
+pub struct SharedWorktree {
+ pub authorized_user_ids: Vec<UserId>,
+ pub connection_ids: Vec<ConnectionId>,
+}
+
impl Store {
pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
self.connections.insert(
@@ -393,7 +398,7 @@ impl Store {
connection_id: ConnectionId,
entries: HashMap<u64, proto::Entry>,
diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
- ) -> Option<Vec<UserId>> {
+ ) -> Option<SharedWorktree> {
let project = self.projects.get_mut(&project_id)?;
let worktree = project.worktrees.get_mut(&worktree_id)?;
if project.host_connection_id == connection_id && project.share.is_some() {
@@ -401,7 +406,10 @@ impl Store {
entries,
diagnostic_summaries,
});
- Some(project.authorized_user_ids())
+ Some(SharedWorktree {
+ authorized_user_ids: project.authorized_user_ids(),
+ connection_ids: project.guest_connection_ids(),
+ })
} else {
None
}
@@ -490,7 +490,7 @@ pub struct WorkspaceParams {
impl WorkspaceParams {
#[cfg(any(test, feature = "test-support"))]
pub fn test(cx: &mut MutableAppContext) -> Self {
- let fs = Arc::new(project::FakeFs::new());
+ let fs = Arc::new(project::FakeFs::new(cx.background().clone()));
let languages = Arc::new(LanguageRegistry::new());
let http_client = client::test::FakeHttpClient::new(|_| async move {
Ok(client::http::ServerResponse::new(404))
@@ -40,7 +40,7 @@ pub fn test_app_state(cx: &mut MutableAppContext) -> Arc<AppState> {
channel_list: cx.add_model(|cx| ChannelList::new(user_store.clone(), client.clone(), cx)),
client,
user_store,
- fs: Arc::new(FakeFs::new()),
+ fs: Arc::new(FakeFs::new(cx.background().clone())),
path_openers: Arc::from(path_openers),
build_window_options: &build_window_options,
build_workspace: &build_workspace,