Detailed changes
@@ -4589,14 +4589,15 @@ checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
[[package]]
name = "postage"
-version = "0.4.1"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a63d25391d04a097954b76aba742b6b5b74f213dfe3dbaeeb36e8ddc1c657f0b"
+checksum = "af3fb618632874fb76937c2361a7f22afd393c982a2165595407edc75b06d3c1"
dependencies = [
"atomic",
"crossbeam-queue",
"futures 0.3.25",
"log",
+ "parking_lot 0.12.1",
"pin-project",
"pollster",
"static_assertions",
@@ -75,7 +75,7 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_derive = { version = "1.0", features = ["deserialize_in_place"] }
serde_json = { version = "1.0", features = ["preserve_order", "raw_value"] }
rand = { version = "0.8" }
-postage = { version = "0.4.1", features = ["futures-traits"] }
+postage = { version = "0.5", features = ["futures-traits"] }
[patch.crates-io]
tree-sitter = { git = "https://github.com/tree-sitter/tree-sitter", rev = "c51896d32dcc11a38e41f36e3deb1a6a9c4f4b14" }
@@ -63,10 +63,10 @@ pub fn init(http_client: Arc<dyn HttpClient>, server_url: String, cx: &mut AppCo
cx.observe_global::<Settings, _>(move |updater, cx| {
if cx.global::<Settings>().auto_update {
if update_subscription.is_none() {
- *(&mut update_subscription) = Some(updater.start_polling(cx))
+ update_subscription = Some(updater.start_polling(cx))
}
} else {
- (&mut update_subscription).take();
+ update_subscription.take();
}
})
.detach();
@@ -419,7 +419,7 @@ impl Room {
false
});
- let response = self.client.request(proto::RejoinRoom {
+ let response = self.client.request_envelope(proto::RejoinRoom {
id: self.id,
reshared_projects,
rejoined_projects,
@@ -427,6 +427,8 @@ impl Room {
cx.spawn(|this, mut cx| async move {
let response = response.await?;
+ let message_id = response.message_id;
+ let response = response.payload;
let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
this.update(&mut cx, |this, cx| {
this.status = RoomStatus::Online;
@@ -443,7 +445,7 @@ impl Room {
for rejoined_project in response.rejoined_projects {
if let Some(project) = projects.get(&rejoined_project.id) {
project.update(cx, |project, cx| {
- project.rejoined(rejoined_project, cx).log_err();
+ project.rejoined(rejoined_project, message_id, cx).log_err();
});
}
}
@@ -45,3 +45,4 @@ collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
rpc = { path = "../rpc", features = ["test-support"] }
settings = { path = "../settings", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }
@@ -10,7 +10,10 @@ use async_tungstenite::tungstenite::{
error::Error as WebsocketError,
http::{Request, StatusCode},
};
-use futures::{future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryStreamExt};
+use futures::{
+ future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _,
+ TryStreamExt,
+};
use gpui::{
actions,
platform::AppVersion,
@@ -471,18 +474,22 @@ impl Client {
pub fn subscribe_to_entity<T: Entity>(
self: &Arc<Self>,
remote_id: u64,
- ) -> PendingEntitySubscription<T> {
+ ) -> Result<PendingEntitySubscription<T>> {
let id = (TypeId::of::<T>(), remote_id);
- self.state
- .write()
- .entities_by_type_and_remote_id
- .insert(id, WeakSubscriber::Pending(Default::default()));
- PendingEntitySubscription {
- client: self.clone(),
- remote_id,
- consumed: false,
- _entity_type: PhantomData,
+ let mut state = self.state.write();
+ if state.entities_by_type_and_remote_id.contains_key(&id) {
+ return Err(anyhow!("already subscribed to entity"));
+ } else {
+ state
+ .entities_by_type_and_remote_id
+ .insert(id, WeakSubscriber::Pending(Default::default()));
+ Ok(PendingEntitySubscription {
+ client: self.clone(),
+ remote_id,
+ consumed: false,
+ _entity_type: PhantomData,
+ })
}
}
@@ -1188,6 +1195,14 @@ impl Client {
&self,
request: T,
) -> impl Future<Output = Result<T::Response>> {
+ self.request_envelope(request)
+ .map_ok(|envelope| envelope.payload)
+ }
+
+ pub fn request_envelope<T: RequestMessage>(
+ &self,
+ request: T,
+ ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
let client_id = self.id;
log::debug!(
"rpc request start. client_id:{}. name:{}",
@@ -1196,7 +1211,7 @@ impl Client {
);
let response = self
.connection_id()
- .map(|conn_id| self.peer.request(conn_id, request));
+ .map(|conn_id| self.peer.request_envelope(conn_id, request));
async move {
let response = response?.await;
log::debug!(
@@ -1595,14 +1610,17 @@ mod tests {
let _subscription1 = client
.subscribe_to_entity(1)
+ .unwrap()
.set_model(&model1, &mut cx.to_async());
let _subscription2 = client
.subscribe_to_entity(2)
+ .unwrap()
.set_model(&model2, &mut cx.to_async());
// Ensure dropping a subscription for the same entity type still allows receiving of
// messages for other entity IDs of the same type.
let subscription3 = client
.subscribe_to_entity(3)
+ .unwrap()
.set_model(&model3, &mut cx.to_async());
drop(subscription3);
@@ -1631,11 +1649,13 @@ mod tests {
},
);
drop(subscription1);
- let _subscription2 =
- client.add_message_handler(model, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
+ let _subscription2 = client.add_message_handler(
+ model.clone(),
+ move |_, _: TypedEnvelope<proto::Ping>, _, _| {
done_tx2.try_send(()).unwrap();
async { Ok(()) }
- });
+ },
+ );
server.send(proto::Ping {});
done_rx2.next().await.unwrap();
}
@@ -175,25 +175,39 @@ impl Database {
.map(|participant| participant.user_id)
.collect::<Vec<_>>();
- // Delete participants who failed to reconnect.
+ // Delete participants who failed to reconnect and cancel their calls.
+ let mut canceled_calls_to_user_ids = Vec::new();
room_participant::Entity::delete_many()
.filter(stale_participant_filter)
.exec(&*tx)
.await?;
+ let called_participants = room_participant::Entity::find()
+ .filter(
+ Condition::all()
+ .add(
+ room_participant::Column::CallingUserId
+ .is_in(stale_participant_user_ids.iter().copied()),
+ )
+ .add(room_participant::Column::AnsweringConnectionId.is_null()),
+ )
+ .all(&*tx)
+ .await?;
+ room_participant::Entity::delete_many()
+ .filter(
+ room_participant::Column::Id
+ .is_in(called_participants.iter().map(|participant| participant.id)),
+ )
+ .exec(&*tx)
+ .await?;
+ canceled_calls_to_user_ids.extend(
+ called_participants
+ .into_iter()
+ .map(|participant| participant.user_id),
+ );
let room = self.get_room(room_id, &tx).await?;
- let mut canceled_calls_to_user_ids = Vec::new();
- // Delete the room if it becomes empty and cancel pending calls.
+ // Delete the room if it becomes empty.
if room.participants.is_empty() {
- canceled_calls_to_user_ids.extend(
- room.pending_participants
- .iter()
- .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
- );
- room_participant::Entity::delete_many()
- .filter(room_participant::Column::RoomId.eq(room_id))
- .exec(&*tx)
- .await?;
project::Entity::delete_many()
.filter(project::Column::RoomId.eq(room_id))
.exec(&*tx)
@@ -228,7 +228,7 @@ impl Server {
.add_message_handler(update_buffer_file)
.add_message_handler(buffer_reloaded)
.add_message_handler(buffer_saved)
- .add_request_handler(save_buffer)
+ .add_request_handler(forward_project_request::<proto::SaveBuffer>)
.add_request_handler(get_users)
.add_request_handler(fuzzy_search_users)
.add_request_handler(request_contact)
@@ -1591,51 +1591,6 @@ where
Ok(())
}
-async fn save_buffer(
- request: proto::SaveBuffer,
- response: Response<proto::SaveBuffer>,
- session: Session,
-) -> Result<()> {
- let project_id = ProjectId::from_proto(request.project_id);
- let host_connection_id = {
- let collaborators = session
- .db()
- .await
- .project_collaborators(project_id, session.connection_id)
- .await?;
- collaborators
- .iter()
- .find(|collaborator| collaborator.is_host)
- .ok_or_else(|| anyhow!("host not found"))?
- .connection_id
- };
- let response_payload = session
- .peer
- .forward_request(session.connection_id, host_connection_id, request.clone())
- .await?;
-
- let mut collaborators = session
- .db()
- .await
- .project_collaborators(project_id, session.connection_id)
- .await?;
- collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
- let project_connection_ids = collaborators
- .iter()
- .map(|collaborator| collaborator.connection_id);
- broadcast(
- Some(host_connection_id),
- project_connection_ids,
- |conn_id| {
- session
- .peer
- .forward_send(host_connection_id, conn_id, response_payload.clone())
- },
- );
- response.send(response_payload)?;
- Ok(())
-}
-
async fn create_buffer_for_peer(
request: proto::CreateBufferForPeer,
session: Session,
@@ -1655,23 +1610,42 @@ async fn update_buffer(
) -> Result<()> {
session.executor.record_backtrace();
let project_id = ProjectId::from_proto(request.project_id);
- let project_connection_ids = session
- .db()
- .await
- .project_connection_ids(project_id, session.connection_id)
- .await?;
+ let mut guest_connection_ids;
+ let mut host_connection_id = None;
+ {
+ let collaborators = session
+ .db()
+ .await
+ .project_collaborators(project_id, session.connection_id)
+ .await?;
+ guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
+ for collaborator in collaborators.iter() {
+ if collaborator.is_host {
+ host_connection_id = Some(collaborator.connection_id);
+ } else {
+ guest_connection_ids.push(collaborator.connection_id);
+ }
+ }
+ }
+ let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
session.executor.record_backtrace();
-
broadcast(
Some(session.connection_id),
- project_connection_ids.iter().copied(),
+ guest_connection_ids,
|connection_id| {
session
.peer
.forward_send(session.connection_id, connection_id, request.clone())
},
);
+ if host_connection_id != session.connection_id {
+ session
+ .peer
+ .forward_request(session.connection_id, host_connection_id, request.clone())
+ .await?;
+ }
+
response.send(proto::Ack {})?;
Ok(())
}
@@ -18,9 +18,10 @@ use parking_lot::Mutex;
use project::{Project, WorktreeId};
use settings::Settings;
use std::{
+ cell::{Ref, RefCell, RefMut},
env,
- ops::Deref,
- path::{Path, PathBuf},
+ ops::{Deref, DerefMut},
+ path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
@@ -209,13 +210,10 @@ impl TestServer {
let client = TestClient {
client,
username: name.to_string(),
- local_projects: Default::default(),
- remote_projects: Default::default(),
- next_root_dir_id: 0,
+ state: Default::default(),
user_store,
fs,
language_registry: Arc::new(LanguageRegistry::test()),
- buffers: Default::default(),
};
client.wait_for_current_user(cx).await;
client
@@ -314,12 +312,16 @@ impl Drop for TestServer {
struct TestClient {
client: Arc<Client>,
username: String,
- local_projects: Vec<ModelHandle<Project>>,
- remote_projects: Vec<ModelHandle<Project>>,
- next_root_dir_id: usize,
+ state: RefCell<TestClientState>,
pub user_store: ModelHandle<UserStore>,
language_registry: Arc<LanguageRegistry>,
fs: Arc<FakeFs>,
+}
+
+#[derive(Default)]
+struct TestClientState {
+ local_projects: Vec<ModelHandle<Project>>,
+ remote_projects: Vec<ModelHandle<Project>>,
buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
}
@@ -358,6 +360,38 @@ impl TestClient {
.await;
}
+ fn local_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
+ Ref::map(self.state.borrow(), |state| &state.local_projects)
+ }
+
+ fn remote_projects<'a>(&'a self) -> impl Deref<Target = Vec<ModelHandle<Project>>> + 'a {
+ Ref::map(self.state.borrow(), |state| &state.remote_projects)
+ }
+
+ fn local_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
+ RefMut::map(self.state.borrow_mut(), |state| &mut state.local_projects)
+ }
+
+ fn remote_projects_mut<'a>(&'a self) -> impl DerefMut<Target = Vec<ModelHandle<Project>>> + 'a {
+ RefMut::map(self.state.borrow_mut(), |state| &mut state.remote_projects)
+ }
+
+ fn buffers_for_project<'a>(
+ &'a self,
+ project: &ModelHandle<Project>,
+ ) -> impl DerefMut<Target = HashSet<ModelHandle<language::Buffer>>> + 'a {
+ RefMut::map(self.state.borrow_mut(), |state| {
+ state.buffers.entry(project.clone()).or_default()
+ })
+ }
+
+ fn buffers<'a>(
+ &'a self,
+ ) -> impl DerefMut<Target = HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>> + 'a
+ {
+ RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers)
+ }
+
fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
self.user_store.read_with(cx, |store, _| ContactsSummary {
current: store
@@ -431,15 +465,6 @@ impl TestClient {
let (_, root_view) = cx.add_window(|_| EmptyView);
cx.add_view(&root_view, |cx| Workspace::test_new(project.clone(), cx))
}
-
- fn create_new_root_dir(&mut self) -> PathBuf {
- format!(
- "/{}-root-{}",
- self.username,
- util::post_inc(&mut self.next_root_dir_id)
- )
- .into()
- }
}
impl Drop for TestClient {
@@ -1633,9 +1633,7 @@ async fn test_project_reconnect(
})
.await
.unwrap();
- worktree_a2
- .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
- .await;
+ deterministic.run_until_parked();
let worktree2_id = worktree_a2.read_with(cx_a, |tree, _| {
assert!(tree.as_local().unwrap().is_shared());
tree.id()
@@ -1696,11 +1694,9 @@ async fn test_project_reconnect(
.unwrap();
// While client A is disconnected, add and remove worktrees from client A's project.
- project_a1
- .update(cx_a, |project, cx| {
- project.remove_worktree(worktree2_id, cx)
- })
- .await;
+ project_a1.update(cx_a, |project, cx| {
+ project.remove_worktree(worktree2_id, cx)
+ });
let (worktree_a3, _) = project_a1
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-1/dir3", true, cx)
@@ -1824,18 +1820,14 @@ async fn test_project_reconnect(
})
.await
.unwrap();
- worktree_a4
- .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
- .await;
+ deterministic.run_until_parked();
let worktree4_id = worktree_a4.read_with(cx_a, |tree, _| {
assert!(tree.as_local().unwrap().is_shared());
tree.id()
});
- project_a1
- .update(cx_a, |project, cx| {
- project.remove_worktree(worktree3_id, cx)
- })
- .await;
+ project_a1.update(cx_a, |project, cx| {
+ project.remove_worktree(worktree3_id, cx)
+ });
deterministic.run_until_parked();
// While client B is disconnected, mutate a buffer on both the host and the guest.
@@ -1,5 +1,5 @@
use crate::{
- db::{self, NewUserParams},
+ db::{self, NewUserParams, UserId},
rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
tests::{TestClient, TestServer},
};
@@ -7,38 +7,50 @@ use anyhow::{anyhow, Result};
use call::ActiveCall;
use client::RECEIVE_TIMEOUT;
use collections::BTreeMap;
+use editor::Bias;
use fs::{FakeFs, Fs as _};
use futures::StreamExt as _;
-use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
-use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16, Rope};
+use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext};
+use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16};
use lsp::FakeLanguageServer;
use parking_lot::Mutex;
-use project::{search::SearchQuery, Project};
+use project::{search::SearchQuery, Project, ProjectPath};
use rand::{
distributions::{Alphanumeric, DistString},
prelude::*,
};
+use serde::{Deserialize, Serialize};
use settings::Settings;
use std::{
env,
- ffi::OsStr,
+ ops::Range,
path::{Path, PathBuf},
- sync::Arc,
+ rc::Rc,
+ sync::{
+ atomic::{AtomicBool, Ordering::SeqCst},
+ Arc,
+ },
};
+use util::ResultExt;
-#[gpui::test(iterations = 100)]
+lazy_static::lazy_static! {
+ static ref PLAN_LOAD_PATH: Option<PathBuf> = path_env_var("LOAD_PLAN");
+ static ref PLAN_SAVE_PATH: Option<PathBuf> = path_env_var("SAVE_PLAN");
+ static ref LOADED_PLAN_JSON: Mutex<Option<Vec<u8>>> = Default::default();
+ static ref PLAN: Mutex<Option<Arc<Mutex<TestPlan>>>> = Default::default();
+}
+
+#[gpui::test(iterations = 100, on_failure = "on_failure")]
async fn test_random_collaboration(
cx: &mut TestAppContext,
deterministic: Arc<Deterministic>,
rng: StdRng,
) {
deterministic.forbid_parking();
- let rng = Arc::new(Mutex::new(rng));
let max_peers = env::var("MAX_PEERS")
.map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
- .unwrap_or(5);
-
+ .unwrap_or(3);
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
@@ -46,7 +58,7 @@ async fn test_random_collaboration(
let mut server = TestServer::start(&deterministic).await;
let db = server.app_state.db.clone();
- let mut available_users = Vec::new();
+ let mut users = Vec::new();
for ix in 0..max_peers {
let username = format!("user-{}", ix + 1);
let user_id = db
@@ -62,195 +74,735 @@ async fn test_random_collaboration(
.await
.unwrap()
.user_id;
- available_users.push((user_id, username));
+ users.push(UserTestPlan {
+ user_id,
+ username,
+ online: false,
+ next_root_id: 0,
+ operation_ix: 0,
+ });
}
- for (ix, (user_id_a, _)) in available_users.iter().enumerate() {
- for (user_id_b, _) in &available_users[ix + 1..] {
+ for (ix, user_a) in users.iter().enumerate() {
+ for user_b in &users[ix + 1..] {
server
.app_state
.db
- .send_contact_request(*user_id_a, *user_id_b)
+ .send_contact_request(user_a.user_id, user_b.user_id)
.await
.unwrap();
server
.app_state
.db
- .respond_to_contact_request(*user_id_b, *user_id_a, true)
+ .respond_to_contact_request(user_b.user_id, user_a.user_id, true)
.await
.unwrap();
}
}
+ let plan = Arc::new(Mutex::new(TestPlan::new(rng, users, max_operations)));
+
+ if let Some(path) = &*PLAN_LOAD_PATH {
+ let json = LOADED_PLAN_JSON
+ .lock()
+ .get_or_insert_with(|| {
+ eprintln!("loaded test plan from path {:?}", path);
+ std::fs::read(path).unwrap()
+ })
+ .clone();
+ plan.lock().deserialize(json);
+ }
+
+ PLAN.lock().replace(plan.clone());
+
let mut clients = Vec::new();
- let mut user_ids = Vec::new();
- let mut op_start_signals = Vec::new();
- let mut next_entity_id = 100000;
- let allow_server_restarts = rng.lock().gen_bool(0.7);
- let allow_client_reconnection = rng.lock().gen_bool(0.7);
- let allow_client_disconnection = rng.lock().gen_bool(0.1);
-
- let mut operations = 0;
- while operations < max_operations {
- let distribution = rng.lock().gen_range(0..100);
- match distribution {
- 0..=19 if !available_users.is_empty() => {
- let client_ix = rng.lock().gen_range(0..available_users.len());
- let (_, username) = available_users.remove(client_ix);
- log::info!("Adding new connection for {}", username);
- next_entity_id += 100000;
- let mut client_cx = TestAppContext::new(
- cx.foreground_platform(),
- cx.platform(),
- deterministic.build_foreground(next_entity_id),
- deterministic.build_background(),
- cx.font_cache(),
- cx.leak_detector(),
- next_entity_id,
- cx.function_name.clone(),
- );
+ let mut client_tasks = Vec::new();
+ let mut operation_channels = Vec::new();
+
+ loop {
+ let Some((next_operation, applied)) = plan.lock().next_server_operation(&clients) else { break };
+ applied.store(true, SeqCst);
+ let did_apply = apply_server_operation(
+ deterministic.clone(),
+ &mut server,
+ &mut clients,
+ &mut client_tasks,
+ &mut operation_channels,
+ plan.clone(),
+ next_operation,
+ cx,
+ )
+ .await;
+ if !did_apply {
+ applied.store(false, SeqCst);
+ }
+ }
- client_cx.update(|cx| cx.set_global(Settings::test(cx)));
-
- let op_start_signal = futures::channel::mpsc::unbounded();
- let client = server.create_client(&mut client_cx, &username).await;
- user_ids.push(client.current_user_id(&client_cx));
- op_start_signals.push(op_start_signal.0);
- clients.push(client_cx.foreground().spawn(simulate_client(
- client,
- op_start_signal.1,
- allow_client_disconnection,
- rng.clone(),
- client_cx,
- )));
-
- log::info!("Added connection for {}", username);
- operations += 1;
- }
+ drop(operation_channels);
+ deterministic.start_waiting();
+ futures::future::join_all(client_tasks).await;
+ deterministic.finish_waiting();
+ deterministic.run_until_parked();
- 20..=24 if clients.len() > 1 && allow_client_disconnection => {
- let client_ix = rng.lock().gen_range(1..clients.len());
- log::info!(
- "Simulating full disconnection of user {}",
- user_ids[client_ix]
- );
- let removed_user_id = user_ids.remove(client_ix);
- let user_connection_ids = server
- .connection_pool
- .lock()
- .user_connection_ids(removed_user_id)
- .collect::<Vec<_>>();
- assert_eq!(user_connection_ids.len(), 1);
- let removed_peer_id = user_connection_ids[0].into();
- let client = clients.remove(client_ix);
- op_start_signals.remove(client_ix);
- server.forbid_connections();
- server.disconnect_client(removed_peer_id);
- deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
- deterministic.start_waiting();
- log::info!("Waiting for user {} to exit...", removed_user_id);
- let (client, mut client_cx) = client.await;
- deterministic.finish_waiting();
- server.allow_connections();
-
- for project in &client.remote_projects {
- project.read_with(&client_cx, |project, _| {
- assert!(
- project.is_read_only(),
- "project {:?} should be read only",
- project.remote_id()
- )
- });
+ check_consistency_between_clients(&clients);
+
+ for (client, mut cx) in clients {
+ cx.update(|cx| {
+ cx.clear_globals();
+ cx.set_global(Settings::test(cx));
+ drop(client);
+ });
+ }
+
+ deterministic.run_until_parked();
+}
+
+fn on_failure() {
+ if let Some(plan) = PLAN.lock().clone() {
+ if let Some(path) = &*PLAN_SAVE_PATH {
+ eprintln!("saved test plan to path {:?}", path);
+ std::fs::write(path, plan.lock().serialize()).unwrap();
+ }
+ }
+}
+
+async fn apply_server_operation(
+ deterministic: Arc<Deterministic>,
+ server: &mut TestServer,
+ clients: &mut Vec<(Rc<TestClient>, TestAppContext)>,
+ client_tasks: &mut Vec<Task<()>>,
+ operation_channels: &mut Vec<futures::channel::mpsc::UnboundedSender<usize>>,
+ plan: Arc<Mutex<TestPlan>>,
+ operation: Operation,
+ cx: &mut TestAppContext,
+) -> bool {
+ match operation {
+ Operation::AddConnection { user_id } => {
+ let username;
+ {
+ let mut plan = plan.lock();
+ let mut user = plan.user(user_id);
+ if user.online {
+ return false;
}
- for user_id in &user_ids {
- let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
- let pool = server.connection_pool.lock();
- for contact in contacts {
- if let db::Contact::Accepted { user_id, busy, .. } = contact {
- if user_id == removed_user_id {
- assert!(!pool.is_user_online(user_id));
- assert!(!busy);
- }
+ user.online = true;
+ username = user.username.clone();
+ };
+ log::info!("Adding new connection for {}", username);
+ let next_entity_id = (user_id.0 * 10_000) as usize;
+ let mut client_cx = TestAppContext::new(
+ cx.foreground_platform(),
+ cx.platform(),
+ deterministic.build_foreground(user_id.0 as usize),
+ deterministic.build_background(),
+ cx.font_cache(),
+ cx.leak_detector(),
+ next_entity_id,
+ cx.function_name.clone(),
+ );
+
+ let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded();
+ let client = Rc::new(server.create_client(&mut client_cx, &username).await);
+ operation_channels.push(operation_tx);
+ clients.push((client.clone(), client_cx.clone()));
+ client_tasks.push(client_cx.foreground().spawn(simulate_client(
+ client,
+ operation_rx,
+ plan.clone(),
+ client_cx,
+ )));
+
+ log::info!("Added connection for {}", username);
+ }
+
+ Operation::RemoveConnection {
+ user_id: removed_user_id,
+ } => {
+ log::info!("Simulating full disconnection of user {}", removed_user_id);
+ let client_ix = clients
+ .iter()
+ .position(|(client, cx)| client.current_user_id(cx) == removed_user_id);
+ let Some(client_ix) = client_ix else { return false };
+ let user_connection_ids = server
+ .connection_pool
+ .lock()
+ .user_connection_ids(removed_user_id)
+ .collect::<Vec<_>>();
+ assert_eq!(user_connection_ids.len(), 1);
+ let removed_peer_id = user_connection_ids[0].into();
+ let (client, mut client_cx) = clients.remove(client_ix);
+ let client_task = client_tasks.remove(client_ix);
+ operation_channels.remove(client_ix);
+ server.forbid_connections();
+ server.disconnect_client(removed_peer_id);
+ deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+ deterministic.start_waiting();
+ log::info!("Waiting for user {} to exit...", removed_user_id);
+ client_task.await;
+ deterministic.finish_waiting();
+ server.allow_connections();
+
+ for project in client.remote_projects().iter() {
+ project.read_with(&client_cx, |project, _| {
+ assert!(
+ project.is_read_only(),
+ "project {:?} should be read only",
+ project.remote_id()
+ )
+ });
+ }
+
+ for (client, cx) in clients {
+ let contacts = server
+ .app_state
+ .db
+ .get_contacts(client.current_user_id(cx))
+ .await
+ .unwrap();
+ let pool = server.connection_pool.lock();
+ for contact in contacts {
+ if let db::Contact::Accepted { user_id, busy, .. } = contact {
+ if user_id == removed_user_id {
+ assert!(!pool.is_user_online(user_id));
+ assert!(!busy);
}
}
}
+ }
- log::info!("{} removed", client.username);
- available_users.push((removed_user_id, client.username.clone()));
- client_cx.update(|cx| {
- cx.clear_globals();
- cx.set_global(Settings::test(cx));
- drop(client);
- });
+ log::info!("{} removed", client.username);
+ plan.lock().user(removed_user_id).online = false;
+ client_cx.update(|cx| {
+ cx.clear_globals();
+ drop(client);
+ });
+ }
- operations += 1;
+ Operation::BounceConnection { user_id } => {
+ log::info!("Simulating temporary disconnection of user {}", user_id);
+ let user_connection_ids = server
+ .connection_pool
+ .lock()
+ .user_connection_ids(user_id)
+ .collect::<Vec<_>>();
+ if user_connection_ids.is_empty() {
+ return false;
}
+ assert_eq!(user_connection_ids.len(), 1);
+ let peer_id = user_connection_ids[0].into();
+ server.disconnect_client(peer_id);
+ deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+ }
- 25..=29 if clients.len() > 1 && allow_client_reconnection => {
- let client_ix = rng.lock().gen_range(1..clients.len());
- let user_id = user_ids[client_ix];
- log::info!("Simulating temporary disconnection of user {}", user_id);
- let user_connection_ids = server
- .connection_pool
- .lock()
- .user_connection_ids(user_id)
- .collect::<Vec<_>>();
- assert_eq!(user_connection_ids.len(), 1);
- let peer_id = user_connection_ids[0].into();
- server.disconnect_client(peer_id);
- deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
- operations += 1;
+ Operation::RestartServer => {
+ log::info!("Simulating server restart");
+ server.reset().await;
+ deterministic.advance_clock(RECEIVE_TIMEOUT);
+ server.start().await.unwrap();
+ deterministic.advance_clock(CLEANUP_TIMEOUT);
+ let environment = &server.app_state.config.zed_environment;
+ let stale_room_ids = server
+ .app_state
+ .db
+ .stale_room_ids(environment, server.id())
+ .await
+ .unwrap();
+ assert_eq!(stale_room_ids, vec![]);
+ }
+
+ Operation::MutateClients {
+ user_ids,
+ batch_id,
+ quiesce,
+ } => {
+ let mut applied = false;
+ for user_id in user_ids {
+ let client_ix = clients
+ .iter()
+ .position(|(client, cx)| client.current_user_id(cx) == user_id);
+ let Some(client_ix) = client_ix else { continue };
+ applied = true;
+ if let Err(err) = operation_channels[client_ix].unbounded_send(batch_id) {
+ log::error!("error signaling user {user_id}: {err}");
+ }
}
- 30..=34 if allow_server_restarts => {
- log::info!("Simulating server restart");
- server.reset().await;
- deterministic.advance_clock(RECEIVE_TIMEOUT);
- server.start().await.unwrap();
- deterministic.advance_clock(CLEANUP_TIMEOUT);
- let environment = &server.app_state.config.zed_environment;
- let stale_room_ids = server
- .app_state
- .db
- .stale_room_ids(environment, server.id())
+ if quiesce && applied {
+ deterministic.run_until_parked();
+ check_consistency_between_clients(&clients);
+ }
+
+ return applied;
+ }
+ }
+ true
+}
+
+async fn apply_client_operation(
+ client: &TestClient,
+ operation: ClientOperation,
+ cx: &mut TestAppContext,
+) -> Result<(), TestError> {
+ match operation {
+ ClientOperation::AcceptIncomingCall => {
+ let active_call = cx.read(ActiveCall::global);
+ if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) {
+ Err(TestError::Inapplicable)?;
+ }
+
+ log::info!("{}: accepting incoming call", client.username);
+ active_call
+ .update(cx, |call, cx| call.accept_incoming(cx))
+ .await?;
+ }
+
+ ClientOperation::RejectIncomingCall => {
+ let active_call = cx.read(ActiveCall::global);
+ if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) {
+ Err(TestError::Inapplicable)?;
+ }
+
+ log::info!("{}: declining incoming call", client.username);
+ active_call.update(cx, |call, _| call.decline_incoming())?;
+ }
+
+ ClientOperation::LeaveCall => {
+ let active_call = cx.read(ActiveCall::global);
+ if active_call.read_with(cx, |call, _| call.room().is_none()) {
+ Err(TestError::Inapplicable)?;
+ }
+
+ log::info!("{}: hanging up", client.username);
+ active_call.update(cx, |call, cx| call.hang_up(cx)).await?;
+ }
+
+ ClientOperation::InviteContactToCall { user_id } => {
+ let active_call = cx.read(ActiveCall::global);
+
+ log::info!("{}: inviting {}", client.username, user_id,);
+ active_call
+ .update(cx, |call, cx| call.invite(user_id.to_proto(), None, cx))
+ .await
+ .log_err();
+ }
+
+ ClientOperation::OpenLocalProject { first_root_name } => {
+ log::info!(
+ "{}: opening local project at {:?}",
+ client.username,
+ first_root_name
+ );
+
+ let root_path = Path::new("/").join(&first_root_name);
+ client.fs.create_dir(&root_path).await.unwrap();
+ client
+ .fs
+ .create_file(&root_path.join("main.rs"), Default::default())
+ .await
+ .unwrap();
+ let project = client.build_local_project(root_path, cx).await.0;
+ ensure_project_shared(&project, client, cx).await;
+ client.local_projects_mut().push(project.clone());
+ }
+
+ ClientOperation::AddWorktreeToProject {
+ project_root_name,
+ new_root_path,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: finding/creating local worktree at {:?} to project with root path {}",
+ client.username,
+ new_root_path,
+ project_root_name
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ if !client.fs.paths().contains(&new_root_path) {
+ client.fs.create_dir(&new_root_path).await.unwrap();
+ }
+ project
+ .update(cx, |project, cx| {
+ project.find_or_create_local_worktree(&new_root_path, true, cx)
+ })
+ .await
+ .unwrap();
+ }
+
+ ClientOperation::CloseRemoteProject { project_root_name } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: closing remote project with root path {}",
+ client.username,
+ project_root_name,
+ );
+
+ let ix = client
+ .remote_projects()
+ .iter()
+ .position(|p| p == &project)
+ .unwrap();
+ cx.update(|_| {
+ client.remote_projects_mut().remove(ix);
+ client.buffers().retain(|p, _| *p != project);
+ drop(project);
+ });
+ }
+
+ ClientOperation::OpenRemoteProject {
+ host_id,
+ first_root_name,
+ } => {
+ let active_call = cx.read(ActiveCall::global);
+ let project = active_call
+ .update(cx, |call, cx| {
+ let room = call.room().cloned()?;
+ let participant = room
+ .read(cx)
+ .remote_participants()
+ .get(&host_id.to_proto())?;
+ let project_id = participant
+ .projects
+ .iter()
+ .find(|project| project.worktree_root_names[0] == first_root_name)?
+ .id;
+ Some(room.update(cx, |room, cx| {
+ room.join_project(
+ project_id,
+ client.language_registry.clone(),
+ FakeFs::new(cx.background().clone()),
+ cx,
+ )
+ }))
+ })
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: joining remote project of user {}, root name {}",
+ client.username,
+ host_id,
+ first_root_name,
+ );
+
+ let project = project.await?;
+ client.remote_projects_mut().push(project.clone());
+ }
+
+ ClientOperation::CreateWorktreeEntry {
+ project_root_name,
+ is_local,
+ full_path,
+ is_dir,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let project_path = project_path_for_full_path(&project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: creating {} at path {:?} in {} project {}",
+ client.username,
+ if is_dir { "dir" } else { "file" },
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ project
+ .update(cx, |p, cx| p.create_entry(project_path, is_dir, cx))
+ .unwrap()
+ .await?;
+ }
+
+ ClientOperation::OpenBuffer {
+ project_root_name,
+ is_local,
+ full_path,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let project_path = project_path_for_full_path(&project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: opening buffer {:?} in {} project {}",
+ client.username,
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ let buffer = project
+ .update(cx, |project, cx| project.open_buffer(project_path, cx))
+ .await?;
+ client.buffers_for_project(&project).insert(buffer);
+ }
+
+ ClientOperation::EditBuffer {
+ project_root_name,
+ is_local,
+ full_path,
+ edits,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let buffer = buffer_for_full_path(client, &project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: editing buffer {:?} in {} project {} with {:?}",
+ client.username,
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ edits
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ buffer.update(cx, |buffer, cx| {
+ let snapshot = buffer.snapshot();
+ buffer.edit(
+ edits.into_iter().map(|(range, text)| {
+ let start = snapshot.clip_offset(range.start, Bias::Left);
+ let end = snapshot.clip_offset(range.end, Bias::Right);
+ (start..end, text)
+ }),
+ None,
+ cx,
+ );
+ });
+ }
+
+ ClientOperation::CloseBuffer {
+ project_root_name,
+ is_local,
+ full_path,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let buffer = buffer_for_full_path(client, &project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: closing buffer {:?} in {} project {}",
+ client.username,
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ cx.update(|_| {
+ client.buffers_for_project(&project).remove(&buffer);
+ drop(buffer);
+ });
+ }
+
+ ClientOperation::SaveBuffer {
+ project_root_name,
+ is_local,
+ full_path,
+ detach,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let buffer = buffer_for_full_path(client, &project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: saving buffer {:?} in {} project {}, {}",
+ client.username,
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ if detach { "detaching" } else { "awaiting" }
+ );
+
+ ensure_project_shared(&project, client, cx).await;
+ let requested_version = buffer.read_with(cx, |buffer, _| buffer.version());
+ let save = project.update(cx, |project, cx| project.save_buffer(buffer, cx));
+ let save = cx.background().spawn(async move {
+ let (saved_version, _, _) = save
.await
- .unwrap();
- assert_eq!(stale_room_ids, vec![]);
+ .map_err(|err| anyhow!("save request failed: {:?}", err))?;
+ assert!(saved_version.observed_all(&requested_version));
+ anyhow::Ok(())
+ });
+ if detach {
+ cx.update(|cx| save.detach_and_log_err(cx));
+ } else {
+ save.await?;
}
+ }
- _ if !op_start_signals.is_empty() => {
- while operations < max_operations && rng.lock().gen_bool(0.7) {
- op_start_signals
- .choose(&mut *rng.lock())
- .unwrap()
- .unbounded_send(())
- .unwrap();
- operations += 1;
- }
+ ClientOperation::RequestLspDataInBuffer {
+ project_root_name,
+ is_local,
+ full_path,
+ offset,
+ kind,
+ detach,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+ let buffer = buffer_for_full_path(client, &project, &full_path, cx)
+ .ok_or(TestError::Inapplicable)?;
- if rng.lock().gen_bool(0.8) {
- deterministic.run_until_parked();
+ log::info!(
+ "{}: request LSP {:?} for buffer {:?} in {} project {}, {}",
+ client.username,
+ kind,
+ full_path,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ if detach { "detaching" } else { "awaiting" }
+ );
+
+ use futures::{FutureExt as _, TryFutureExt as _};
+ let offset = buffer.read_with(cx, |b, _| b.clip_offset(offset, Bias::Left));
+ let request = cx.foreground().spawn(project.update(cx, |project, cx| {
+ match kind {
+ LspRequestKind::Rename => project
+ .prepare_rename(buffer, offset, cx)
+ .map_ok(|_| ())
+ .boxed(),
+ LspRequestKind::Completion => project
+ .completions(&buffer, offset, cx)
+ .map_ok(|_| ())
+ .boxed(),
+ LspRequestKind::CodeAction => project
+ .code_actions(&buffer, offset..offset, cx)
+ .map_ok(|_| ())
+ .boxed(),
+ LspRequestKind::Definition => project
+ .definition(&buffer, offset, cx)
+ .map_ok(|_| ())
+ .boxed(),
+ LspRequestKind::Highlights => project
+ .document_highlights(&buffer, offset, cx)
+ .map_ok(|_| ())
+ .boxed(),
}
+ }));
+ if detach {
+ request.detach();
+ } else {
+ request.await?;
}
- _ => {}
}
- }
- drop(op_start_signals);
- deterministic.start_waiting();
- let clients = futures::future::join_all(clients).await;
- deterministic.finish_waiting();
- deterministic.run_until_parked();
+ ClientOperation::SearchProject {
+ project_root_name,
+ is_local,
+ query,
+ detach,
+ } => {
+ let project = project_for_root_name(client, &project_root_name, cx)
+ .ok_or(TestError::Inapplicable)?;
+
+ log::info!(
+ "{}: search {} project {} for {:?}, {}",
+ client.username,
+ if is_local { "local" } else { "remote" },
+ project_root_name,
+ query,
+ if detach { "detaching" } else { "awaiting" }
+ );
- for (client, client_cx) in &clients {
- for guest_project in &client.remote_projects {
+ let search = project.update(cx, |project, cx| {
+ project.search(SearchQuery::text(query, false, false), cx)
+ });
+ drop(project);
+ let search = cx.background().spawn(async move {
+ search
+ .await
+ .map_err(|err| anyhow!("search request failed: {:?}", err))
+ });
+ if detach {
+ cx.update(|cx| search.detach_and_log_err(cx));
+ } else {
+ search.await?;
+ }
+ }
+
+ ClientOperation::WriteFsEntry {
+ path,
+ is_dir,
+ content,
+ } => {
+ if !client
+ .fs
+ .directories()
+ .contains(&path.parent().unwrap().to_owned())
+ {
+ return Err(TestError::Inapplicable);
+ }
+
+ if is_dir {
+ log::info!("{}: creating dir at {:?}", client.username, path);
+ client.fs.create_dir(&path).await.unwrap();
+ } else {
+ let exists = client.fs.metadata(&path).await?.is_some();
+ let verb = if exists { "updating" } else { "creating" };
+ log::info!("{}: {} file at {:?}", verb, client.username, path);
+
+ client
+ .fs
+ .save(&path, &content.as_str().into(), fs::LineEnding::Unix)
+ .await
+ .unwrap();
+ }
+ }
+
+ ClientOperation::WriteGitIndex {
+ repo_path,
+ contents,
+ } => {
+ if !client.fs.directories().contains(&repo_path) {
+ return Err(TestError::Inapplicable);
+ }
+
+ log::info!(
+ "{}: writing git index for repo {:?}: {:?}",
+ client.username,
+ repo_path,
+ contents
+ );
+
+ let dot_git_dir = repo_path.join(".git");
+ let contents = contents
+ .iter()
+ .map(|(path, contents)| (path.as_path(), contents.clone()))
+ .collect::<Vec<_>>();
+ if client.fs.metadata(&dot_git_dir).await?.is_none() {
+ client.fs.create_dir(&dot_git_dir).await?;
+ }
+ client.fs.set_index_for_repo(&dot_git_dir, &contents).await;
+ }
+ }
+ Ok(())
+}
+
+fn check_consistency_between_clients(clients: &[(Rc<TestClient>, TestAppContext)]) {
+ for (client, client_cx) in clients {
+ for guest_project in client.remote_projects().iter() {
guest_project.read_with(client_cx, |guest_project, cx| {
let host_project = clients.iter().find_map(|(client, cx)| {
- let project = client.local_projects.iter().find(|host_project| {
- host_project.read_with(cx, |host_project, _| {
- host_project.remote_id() == guest_project.remote_id()
- })
- })?;
+ let project = client
+ .local_projects()
+ .iter()
+ .find(|host_project| {
+ host_project.read_with(cx, |host_project, _| {
+ host_project.remote_id() == guest_project.remote_id()
+ })
+ })?
+ .clone();
Some((project, cx))
});
@@ -275,10 +827,10 @@ async fn test_random_collaboration(
.collect::<BTreeMap<_, _>>();
assert_eq!(
- guest_worktree_snapshots.keys().collect::<Vec<_>>(),
- host_worktree_snapshots.keys().collect::<Vec<_>>(),
- "{} has different worktrees than the host",
- client.username
+ guest_worktree_snapshots.values().map(|w| w.abs_path()).collect::<Vec<_>>(),
+ host_worktree_snapshots.values().map(|w| w.abs_path()).collect::<Vec<_>>(),
+ "{} has different worktrees than the host for project {:?}",
+ client.username, guest_project.remote_id(),
);
for (id, host_snapshot) in &host_worktree_snapshots {
@@ -286,36 +838,53 @@ async fn test_random_collaboration(
assert_eq!(
guest_snapshot.root_name(),
host_snapshot.root_name(),
- "{} has different root name than the host for worktree {}",
+ "{} has different root name than the host for worktree {}, project {:?}",
client.username,
- id
+ id,
+ guest_project.remote_id(),
);
assert_eq!(
guest_snapshot.abs_path(),
host_snapshot.abs_path(),
- "{} has different abs path than the host for worktree {}",
+ "{} has different abs path than the host for worktree {}, project: {:?}",
client.username,
- id
+ id,
+ guest_project.remote_id(),
);
assert_eq!(
guest_snapshot.entries(false).collect::<Vec<_>>(),
host_snapshot.entries(false).collect::<Vec<_>>(),
- "{} has different snapshot than the host for worktree {} ({:?}) and project {:?}",
+ "{} has different snapshot than the host for worktree {:?} and project {:?}",
+ client.username,
+ host_snapshot.abs_path(),
+ guest_project.remote_id(),
+ );
+ assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id(),
+ "{} has different scan id than the host for worktree {:?} and project {:?}",
client.username,
- id,
host_snapshot.abs_path(),
- host_project.read_with(host_cx, |project, _| project.remote_id())
+ guest_project.remote_id(),
);
- assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
}
}
}
- guest_project.check_invariants(cx);
+ for buffer in guest_project.opened_buffers(cx) {
+ let buffer = buffer.read(cx);
+ assert_eq!(
+ buffer.deferred_ops_len(),
+ 0,
+ "{} has deferred operations for buffer {:?} in project {:?}",
+ client.username,
+ buffer.file().unwrap().full_path(cx),
+ guest_project.remote_id(),
+ );
+ }
});
}
- for (guest_project, guest_buffers) in &client.buffers {
+ let buffers = client.buffers().clone();
+ for (guest_project, guest_buffers) in &buffers {
let project_id = if guest_project.read_with(client_cx, |project, _| {
project.is_local() || project.is_read_only()
}) {
@@ -15,12 +15,9 @@ pub struct BlinkManager {
impl BlinkManager {
pub fn new(blink_interval: Duration, cx: &mut ModelContext<Self>) -> Self {
- let weak_handle = cx.weak_handle();
- cx.observe_global::<Settings, _>(move |_, cx| {
- if let Some(this) = weak_handle.upgrade(cx) {
- // Make sure we blink the cursors if the setting is re-enabled
- this.update(cx, |this, cx| this.blink_cursors(this.blink_epoch, cx));
- }
+ cx.observe_global::<Settings, _>(move |this, cx| {
+ // Make sure we blink the cursors if the setting is re-enabled
+ this.blink_cursors(this.blink_epoch, cx)
})
.detach();
@@ -5,7 +5,7 @@ use fsevent::EventStream;
use futures::{future::BoxFuture, Stream, StreamExt};
use git2::Repository as LibGitRepository;
use lazy_static::lazy_static;
-use parking_lot::Mutex as SyncMutex;
+use parking_lot::Mutex;
use regex::Regex;
use repository::GitRepository;
use rope::Rope;
@@ -27,8 +27,6 @@ use util::ResultExt;
#[cfg(any(test, feature = "test-support"))]
use collections::{btree_map, BTreeMap};
#[cfg(any(test, feature = "test-support"))]
-use futures::lock::Mutex;
-#[cfg(any(test, feature = "test-support"))]
use repository::FakeGitRepositoryState;
#[cfg(any(test, feature = "test-support"))]
use std::sync::Weak;
@@ -117,7 +115,7 @@ pub trait Fs: Send + Sync {
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
- fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<SyncMutex<dyn GitRepository>>>;
+ fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<Mutex<dyn GitRepository>>>;
fn is_fake(&self) -> bool;
#[cfg(any(test, feature = "test-support"))]
fn as_fake(&self) -> &FakeFs;
@@ -350,11 +348,11 @@ impl Fs for RealFs {
})))
}
- fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<SyncMutex<dyn GitRepository>>> {
+ fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<Mutex<dyn GitRepository>>> {
LibGitRepository::open(&dotgit_path)
.log_err()
- .and_then::<Arc<SyncMutex<dyn GitRepository>>, _>(|libgit_repository| {
- Some(Arc::new(SyncMutex::new(libgit_repository)))
+ .and_then::<Arc<Mutex<dyn GitRepository>>, _>(|libgit_repository| {
+ Some(Arc::new(Mutex::new(libgit_repository)))
})
}
@@ -396,7 +394,7 @@ enum FakeFsEntry {
inode: u64,
mtime: SystemTime,
entries: BTreeMap<String, Arc<Mutex<FakeFsEntry>>>,
- git_repo_state: Option<Arc<SyncMutex<repository::FakeGitRepositoryState>>>,
+ git_repo_state: Option<Arc<Mutex<repository::FakeGitRepositoryState>>>,
},
Symlink {
target: PathBuf,
@@ -405,18 +403,14 @@ enum FakeFsEntry {
#[cfg(any(test, feature = "test-support"))]
impl FakeFsState {
- async fn read_path<'a>(&'a self, target: &Path) -> Result<Arc<Mutex<FakeFsEntry>>> {
+ fn read_path<'a>(&'a self, target: &Path) -> Result<Arc<Mutex<FakeFsEntry>>> {
Ok(self
.try_read_path(target)
- .await
.ok_or_else(|| anyhow!("path does not exist: {}", target.display()))?
.0)
}
- async fn try_read_path<'a>(
- &'a self,
- target: &Path,
- ) -> Option<(Arc<Mutex<FakeFsEntry>>, PathBuf)> {
+ fn try_read_path<'a>(&'a self, target: &Path) -> Option<(Arc<Mutex<FakeFsEntry>>, PathBuf)> {
let mut path = target.to_path_buf();
let mut real_path = PathBuf::new();
let mut entry_stack = Vec::new();
@@ -438,10 +432,10 @@ impl FakeFsState {
}
Component::Normal(name) => {
let current_entry = entry_stack.last().cloned()?;
- let current_entry = current_entry.lock().await;
+ let current_entry = current_entry.lock();
if let FakeFsEntry::Dir { entries, .. } = &*current_entry {
let entry = entries.get(name.to_str().unwrap()).cloned()?;
- let _entry = entry.lock().await;
+ let _entry = entry.lock();
if let FakeFsEntry::Symlink { target, .. } = &*_entry {
let mut target = target.clone();
target.extend(path_components);
@@ -462,7 +456,7 @@ impl FakeFsState {
entry_stack.pop().map(|entry| (entry, real_path))
}
- async fn write_path<Fn, T>(&self, path: &Path, callback: Fn) -> Result<T>
+ fn write_path<Fn, T>(&self, path: &Path, callback: Fn) -> Result<T>
where
Fn: FnOnce(btree_map::Entry<String, Arc<Mutex<FakeFsEntry>>>) -> Result<T>,
{
@@ -472,8 +466,8 @@ impl FakeFsState {
.ok_or_else(|| anyhow!("cannot overwrite the root"))?;
let parent_path = path.parent().unwrap();
- let parent = self.read_path(parent_path).await?;
- let mut parent = parent.lock().await;
+ let parent = self.read_path(parent_path)?;
+ let mut parent = parent.lock();
let new_entry = parent
.dir_entries(parent_path)?
.entry(filename.to_str().unwrap().into());
@@ -529,7 +523,7 @@ impl FakeFs {
}
pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) {
- let mut state = self.state.lock().await;
+ let mut state = self.state.lock();
let path = path.as_ref();
let inode = state.next_inode;
let mtime = state.next_mtime;
@@ -552,13 +546,12 @@ impl FakeFs {
}
Ok(())
})
- .await
.unwrap();
state.emit_event(&[path]);
}
pub async fn insert_symlink(&self, path: impl AsRef<Path>, target: PathBuf) {
- let mut state = self.state.lock().await;
+ let mut state = self.state.lock();
let path = path.as_ref();
let file = Arc::new(Mutex::new(FakeFsEntry::Symlink { target }));
state
@@ -572,21 +565,20 @@ impl FakeFs {
Ok(())
}
})
- .await
.unwrap();
state.emit_event(&[path]);
}
pub async fn pause_events(&self) {
- self.state.lock().await.events_paused = true;
+ self.state.lock().events_paused = true;
}
pub async fn buffered_event_count(&self) -> usize {
- self.state.lock().await.buffered_events.len()
+ self.state.lock().buffered_events.len()
}
pub async fn flush_events(&self, count: usize) {
- self.state.lock().await.flush_events(count);
+ self.state.lock().flush_events(count);
}
#[must_use]
@@ -625,9 +617,9 @@ impl FakeFs {
}
pub async fn set_index_for_repo(&self, dot_git: &Path, head_state: &[(&Path, String)]) {
- let mut state = self.state.lock().await;
- let entry = state.read_path(dot_git).await.unwrap();
- let mut entry = entry.lock().await;
+ let mut state = self.state.lock();
+ let entry = state.read_path(dot_git).unwrap();
+ let mut entry = entry.lock();
if let FakeFsEntry::Dir { git_repo_state, .. } = &mut *entry {
let repo_state = git_repo_state.get_or_insert_with(Default::default);
@@ -646,12 +638,12 @@ impl FakeFs {
}
}
- pub async fn paths(&self) -> Vec<PathBuf> {
+ pub fn paths(&self) -> Vec<PathBuf> {
let mut result = Vec::new();
let mut queue = collections::VecDeque::new();
- queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone()));
+ queue.push_back((PathBuf::from("/"), self.state.lock().root.clone()));
while let Some((path, entry)) = queue.pop_front() {
- if let FakeFsEntry::Dir { entries, .. } = &*entry.lock().await {
+ if let FakeFsEntry::Dir { entries, .. } = &*entry.lock() {
for (name, entry) in entries {
queue.push_back((path.join(name), entry.clone()));
}
@@ -661,12 +653,12 @@ impl FakeFs {
result
}
- pub async fn directories(&self) -> Vec<PathBuf> {
+ pub fn directories(&self) -> Vec<PathBuf> {
let mut result = Vec::new();
let mut queue = collections::VecDeque::new();
- queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone()));
+ queue.push_back((PathBuf::from("/"), self.state.lock().root.clone()));
while let Some((path, entry)) = queue.pop_front() {
- if let FakeFsEntry::Dir { entries, .. } = &*entry.lock().await {
+ if let FakeFsEntry::Dir { entries, .. } = &*entry.lock() {
for (name, entry) in entries {
queue.push_back((path.join(name), entry.clone()));
}
@@ -676,12 +668,12 @@ impl FakeFs {
result
}
- pub async fn files(&self) -> Vec<PathBuf> {
+ pub fn files(&self) -> Vec<PathBuf> {
let mut result = Vec::new();
let mut queue = collections::VecDeque::new();
- queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone()));
+ queue.push_back((PathBuf::from("/"), self.state.lock().root.clone()));
while let Some((path, entry)) = queue.pop_front() {
- let e = entry.lock().await;
+ let e = entry.lock();
match &*e {
FakeFsEntry::File { .. } => result.push(path),
FakeFsEntry::Dir { entries, .. } => {
@@ -745,11 +737,11 @@ impl FakeFsEntry {
impl Fs for FakeFs {
async fn create_dir(&self, path: &Path) -> Result<()> {
self.simulate_random_delay().await;
- let mut state = self.state.lock().await;
let mut created_dirs = Vec::new();
let mut cur_path = PathBuf::new();
for component in path.components() {
+ let mut state = self.state.lock();
cur_path.push(component);
if cur_path == Path::new("/") {
continue;
@@ -759,29 +751,27 @@ impl Fs for FakeFs {
let mtime = state.next_mtime;
state.next_mtime += Duration::from_nanos(1);
state.next_inode += 1;
- state
- .write_path(&cur_path, |entry| {
- entry.or_insert_with(|| {
- created_dirs.push(cur_path.clone());
- Arc::new(Mutex::new(FakeFsEntry::Dir {
- inode,
- mtime,
- entries: Default::default(),
- git_repo_state: None,
- }))
- });
- Ok(())
- })
- .await?;
+ state.write_path(&cur_path, |entry| {
+ entry.or_insert_with(|| {
+ created_dirs.push(cur_path.clone());
+ Arc::new(Mutex::new(FakeFsEntry::Dir {
+ inode,
+ mtime,
+ entries: Default::default(),
+ git_repo_state: None,
+ }))
+ });
+ Ok(())
+ })?
}
- state.emit_event(&created_dirs);
+ self.state.lock().emit_event(&created_dirs);
Ok(())
}
async fn create_file(&self, path: &Path, options: CreateOptions) -> Result<()> {
self.simulate_random_delay().await;
- let mut state = self.state.lock().await;
+ let mut state = self.state.lock();
let inode = state.next_inode;
let mtime = state.next_mtime;
state.next_mtime += Duration::from_nanos(1);
@@ -791,108 +781,106 @@ impl Fs for FakeFs {
mtime,
content: String::new(),
}));
- state
- .write_path(path, |entry| {
- match entry {
- btree_map::Entry::Occupied(mut e) => {
- if options.overwrite {
- *e.get_mut() = file;
- } else if !options.ignore_if_exists {
- return Err(anyhow!("path already exists: {}", path.display()));
- }
- }
- btree_map::Entry::Vacant(e) => {
- e.insert(file);
+ state.write_path(path, |entry| {
+ match entry {
+ btree_map::Entry::Occupied(mut e) => {
+ if options.overwrite {
+ *e.get_mut() = file;
+ } else if !options.ignore_if_exists {
+ return Err(anyhow!("path already exists: {}", path.display()));
}
}
- Ok(())
- })
- .await?;
+ btree_map::Entry::Vacant(e) => {
+ e.insert(file);
+ }
+ }
+ Ok(())
+ })?;
state.emit_event(&[path]);
Ok(())
}
async fn rename(&self, old_path: &Path, new_path: &Path, options: RenameOptions) -> Result<()> {
+ self.simulate_random_delay().await;
+
let old_path = normalize_path(old_path);
let new_path = normalize_path(new_path);
- let mut state = self.state.lock().await;
- let moved_entry = state
- .write_path(&old_path, |e| {
- if let btree_map::Entry::Occupied(e) = e {
- Ok(e.remove())
- } else {
- Err(anyhow!("path does not exist: {}", &old_path.display()))
- }
- })
- .await?;
- state
- .write_path(&new_path, |e| {
- match e {
- btree_map::Entry::Occupied(mut e) => {
- if options.overwrite {
- *e.get_mut() = moved_entry;
- } else if !options.ignore_if_exists {
- return Err(anyhow!("path already exists: {}", new_path.display()));
- }
- }
- btree_map::Entry::Vacant(e) => {
- e.insert(moved_entry);
+ let mut state = self.state.lock();
+ let moved_entry = state.write_path(&old_path, |e| {
+ if let btree_map::Entry::Occupied(e) = e {
+ Ok(e.remove())
+ } else {
+ Err(anyhow!("path does not exist: {}", &old_path.display()))
+ }
+ })?;
+ state.write_path(&new_path, |e| {
+ match e {
+ btree_map::Entry::Occupied(mut e) => {
+ if options.overwrite {
+ *e.get_mut() = moved_entry;
+ } else if !options.ignore_if_exists {
+ return Err(anyhow!("path already exists: {}", new_path.display()));
}
}
- Ok(())
- })
- .await?;
+ btree_map::Entry::Vacant(e) => {
+ e.insert(moved_entry);
+ }
+ }
+ Ok(())
+ })?;
state.emit_event(&[old_path, new_path]);
Ok(())
}
async fn copy_file(&self, source: &Path, target: &Path, options: CopyOptions) -> Result<()> {
+ self.simulate_random_delay().await;
+
let source = normalize_path(source);
let target = normalize_path(target);
- let mut state = self.state.lock().await;
+ let mut state = self.state.lock();
let mtime = state.next_mtime;
let inode = util::post_inc(&mut state.next_inode);
state.next_mtime += Duration::from_nanos(1);
- let source_entry = state.read_path(&source).await?;
- let content = source_entry.lock().await.file_content(&source)?.clone();
- let entry = state
- .write_path(&target, |e| match e {
- btree_map::Entry::Occupied(e) => {
- if options.overwrite {
- Ok(Some(e.get().clone()))
- } else if !options.ignore_if_exists {
- return Err(anyhow!("{target:?} already exists"));
- } else {
- Ok(None)
- }
+ let source_entry = state.read_path(&source)?;
+ let content = source_entry.lock().file_content(&source)?.clone();
+ let entry = state.write_path(&target, |e| match e {
+ btree_map::Entry::Occupied(e) => {
+ if options.overwrite {
+ Ok(Some(e.get().clone()))
+ } else if !options.ignore_if_exists {
+ return Err(anyhow!("{target:?} already exists"));
+ } else {
+ Ok(None)
}
- btree_map::Entry::Vacant(e) => Ok(Some(
- e.insert(Arc::new(Mutex::new(FakeFsEntry::File {
- inode,
- mtime,
- content: String::new(),
- })))
- .clone(),
- )),
- })
- .await?;
+ }
+ btree_map::Entry::Vacant(e) => Ok(Some(
+ e.insert(Arc::new(Mutex::new(FakeFsEntry::File {
+ inode,
+ mtime,
+ content: String::new(),
+ })))
+ .clone(),
+ )),
+ })?;
if let Some(entry) = entry {
- entry.lock().await.set_file_content(&target, content)?;
+ entry.lock().set_file_content(&target, content)?;
}
state.emit_event(&[target]);
Ok(())
}
async fn remove_dir(&self, path: &Path, options: RemoveOptions) -> Result<()> {
+ self.simulate_random_delay().await;
+
let path = normalize_path(path);
let parent_path = path
.parent()
.ok_or_else(|| anyhow!("cannot remove the root"))?;
let base_name = path.file_name().unwrap();
- let mut state = self.state.lock().await;
- let parent_entry = state.read_path(parent_path).await?;
- let mut parent_entry = parent_entry.lock().await;
+ let mut state = self.state.lock();
+ let parent_entry = state.read_path(parent_path)?;
+ let mut parent_entry = parent_entry.lock();
let entry = parent_entry
.dir_entries(parent_path)?
.entry(base_name.to_str().unwrap().into());
@@ -905,7 +893,7 @@ impl Fs for FakeFs {
}
btree_map::Entry::Occupied(e) => {
{
- let mut entry = e.get().lock().await;
+ let mut entry = e.get().lock();
let children = entry.dir_entries(&path)?;
if !options.recursive && !children.is_empty() {
return Err(anyhow!("{path:?} is not empty"));
@@ -919,14 +907,16 @@ impl Fs for FakeFs {
}
async fn remove_file(&self, path: &Path, options: RemoveOptions) -> Result<()> {
+ self.simulate_random_delay().await;
+
let path = normalize_path(path);
let parent_path = path
.parent()
.ok_or_else(|| anyhow!("cannot remove the root"))?;
let base_name = path.file_name().unwrap();
- let mut state = self.state.lock().await;
- let parent_entry = state.read_path(parent_path).await?;
- let mut parent_entry = parent_entry.lock().await;
+ let mut state = self.state.lock();
+ let parent_entry = state.read_path(parent_path)?;
+ let mut parent_entry = parent_entry.lock();
let entry = parent_entry
.dir_entries(parent_path)?
.entry(base_name.to_str().unwrap().into());
@@ -937,7 +927,7 @@ impl Fs for FakeFs {
}
}
btree_map::Entry::Occupied(e) => {
- e.get().lock().await.file_content(&path)?;
+ e.get().lock().file_content(&path)?;
e.remove();
}
}
@@ -953,9 +943,9 @@ impl Fs for FakeFs {
async fn load(&self, path: &Path) -> Result<String> {
let path = normalize_path(path);
self.simulate_random_delay().await;
- let state = self.state.lock().await;
- let entry = state.read_path(&path).await?;
- let entry = entry.lock().await;
+ let state = self.state.lock();
+ let entry = state.read_path(&path)?;
+ let entry = entry.lock();
entry.file_content(&path).cloned()
}
@@ -978,8 +968,8 @@ impl Fs for FakeFs {
async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
let path = normalize_path(path);
self.simulate_random_delay().await;
- let state = self.state.lock().await;
- if let Some((_, real_path)) = state.try_read_path(&path).await {
+ let state = self.state.lock();
+ if let Some((_, real_path)) = state.try_read_path(&path) {
Ok(real_path)
} else {
Err(anyhow!("path does not exist: {}", path.display()))
@@ -989,9 +979,9 @@ impl Fs for FakeFs {
async fn is_file(&self, path: &Path) -> bool {
let path = normalize_path(path);
self.simulate_random_delay().await;
- let state = self.state.lock().await;
- if let Some((entry, _)) = state.try_read_path(&path).await {
- entry.lock().await.is_file()
+ let state = self.state.lock();
+ if let Some((entry, _)) = state.try_read_path(&path) {
+ entry.lock().is_file()
} else {
false
}
@@ -1000,9 +990,9 @@ impl Fs for FakeFs {
async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
self.simulate_random_delay().await;
let path = normalize_path(path);
- let state = self.state.lock().await;
- if let Some((entry, real_path)) = state.try_read_path(&path).await {
- let entry = entry.lock().await;
+ let state = self.state.lock();
+ if let Some((entry, real_path)) = state.try_read_path(&path) {
+ let entry = entry.lock();
let is_symlink = real_path != path;
Ok(Some(match &*entry {
@@ -1031,9 +1021,9 @@ impl Fs for FakeFs {
) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
self.simulate_random_delay().await;
let path = normalize_path(path);
- let state = self.state.lock().await;
- let entry = state.read_path(&path).await?;
- let mut entry = entry.lock().await;
+ let state = self.state.lock();
+ let entry = state.read_path(&path)?;
+ let mut entry = entry.lock();
let children = entry.dir_entries(&path)?;
let paths = children
.keys()
@@ -1047,10 +1037,9 @@ impl Fs for FakeFs {
path: &Path,
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
- let mut state = self.state.lock().await;
self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded();
- state.event_txs.push(tx);
+ self.state.lock().event_txs.push(tx);
let path = path.to_path_buf();
let executor = self.executor.clone();
Box::pin(futures::StreamExt::filter(rx, move |events| {
@@ -1065,22 +1054,18 @@ impl Fs for FakeFs {
}))
}
- fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<SyncMutex<dyn GitRepository>>> {
- smol::block_on(async move {
- let state = self.state.lock().await;
- let entry = state.read_path(abs_dot_git).await.unwrap();
- let mut entry = entry.lock().await;
- if let FakeFsEntry::Dir { git_repo_state, .. } = &mut *entry {
- let state = git_repo_state
- .get_or_insert_with(|| {
- Arc::new(SyncMutex::new(FakeGitRepositoryState::default()))
- })
- .clone();
- Some(repository::FakeGitRepository::open(state))
- } else {
- None
- }
- })
+ fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<Mutex<dyn GitRepository>>> {
+ let state = self.state.lock();
+ let entry = state.read_path(abs_dot_git).unwrap();
+ let mut entry = entry.lock();
+ if let FakeFsEntry::Dir { git_repo_state, .. } = &mut *entry {
+ let state = git_repo_state
+ .get_or_insert_with(|| Arc::new(Mutex::new(FakeGitRepositoryState::default())))
+ .clone();
+ Some(repository::FakeGitRepository::open(state))
+ } else {
+ None
+ }
}
fn is_fake(&self) -> bool {
@@ -1213,7 +1198,7 @@ mod tests {
.await;
assert_eq!(
- fs.files().await,
+ fs.files(),
vec![
PathBuf::from("/root/dir1/a"),
PathBuf::from("/root/dir1/b"),
@@ -2618,7 +2618,7 @@ impl UpgradeModelHandle for AppContext {
&self,
handle: &WeakModelHandle<T>,
) -> Option<ModelHandle<T>> {
- if self.models.contains_key(&handle.model_id) {
+ if self.ref_counts.lock().is_entity_alive(handle.model_id) {
Some(ModelHandle::new(handle.model_id, &self.ref_counts))
} else {
None
@@ -2626,11 +2626,11 @@ impl UpgradeModelHandle for AppContext {
}
fn model_handle_is_upgradable<T: Entity>(&self, handle: &WeakModelHandle<T>) -> bool {
- self.models.contains_key(&handle.model_id)
+ self.ref_counts.lock().is_entity_alive(handle.model_id)
}
fn upgrade_any_model_handle(&self, handle: &AnyWeakModelHandle) -> Option<AnyModelHandle> {
- if self.models.contains_key(&handle.model_id) {
+ if self.ref_counts.lock().is_entity_alive(handle.model_id) {
Some(AnyModelHandle::new(
handle.model_id,
handle.model_type,
@@ -32,6 +32,7 @@ use super::{
ref_counts::LeakDetector, window_input_handler::WindowInputHandler, AsyncAppContext, RefCounts,
};
+#[derive(Clone)]
pub struct TestAppContext {
cx: Rc<RefCell<AppContext>>,
foreground_platform: Rc<platform::test::ForegroundPlatform>,
@@ -829,6 +829,16 @@ impl Background {
}
}
+ #[cfg(any(test, feature = "test-support"))]
+ pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
+ match self {
+ Self::Deterministic { executor, .. } => {
+ parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
+ }
+ _ => panic!("this method can only be called on a deterministic executor"),
+ }
+ }
+
#[cfg(any(test, feature = "test-support"))]
pub async fn simulate_random_delay(&self) {
match self {
@@ -46,6 +46,7 @@ pub fn run_test(
Arc<executor::Deterministic>,
u64,
)),
+ on_fail_fn: Option<fn()>,
fn_name: String,
) {
// let _profiler = dhat::Profiler::new_heap();
@@ -178,6 +179,7 @@ pub fn run_test(
if is_randomized {
eprintln!("failing seed: {}", atomic_seed.load(SeqCst));
}
+ on_fail_fn.map(|f| f());
panic::resume_unwind(error);
}
}
@@ -1,4 +1,5 @@
use proc_macro::TokenStream;
+use proc_macro2::Ident;
use quote::{format_ident, quote};
use std::mem;
use syn::{
@@ -15,6 +16,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
let mut num_iterations = 1;
let mut starting_seed = 0;
let mut detect_nondeterminism = false;
+ let mut on_failure_fn_name = quote!(None);
for arg in args {
match arg {
@@ -33,6 +35,20 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
Some("retries") => max_retries = parse_int(&meta.lit)?,
Some("iterations") => num_iterations = parse_int(&meta.lit)?,
Some("seed") => starting_seed = parse_int(&meta.lit)?,
+ Some("on_failure") => {
+ if let Lit::Str(name) = meta.lit {
+ let ident = Ident::new(&name.value(), name.span());
+ on_failure_fn_name = quote!(Some(#ident));
+ } else {
+ return Err(TokenStream::from(
+ syn::Error::new(
+ meta.lit.span(),
+ "on_failure argument must be a string",
+ )
+ .into_compile_error(),
+ ));
+ }
+ }
_ => {
return Err(TokenStream::from(
syn::Error::new(meta.path.span(), "invalid argument")
@@ -152,6 +168,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
cx.foreground().run(#inner_fn_name(#inner_fn_args));
#cx_teardowns
},
+ #on_failure_fn_name,
stringify!(#outer_fn_name).to_string(),
);
}
@@ -187,6 +204,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
#max_retries,
#detect_nondeterminism,
&mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args),
+ #on_failure_fn_name,
stringify!(#outer_fn_name).to_string(),
);
}
@@ -377,7 +377,7 @@ impl Buffer {
rpc::proto::LineEnding::from_i32(message.line_ending)
.ok_or_else(|| anyhow!("missing line_ending"))?,
));
- this.saved_version = proto::deserialize_version(message.saved_version);
+ this.saved_version = proto::deserialize_version(&message.saved_version);
this.saved_version_fingerprint =
proto::deserialize_fingerprint(&message.saved_version_fingerprint)?;
this.saved_mtime = message
@@ -1309,21 +1309,25 @@ impl Buffer {
pub fn wait_for_edits(
&mut self,
edit_ids: impl IntoIterator<Item = clock::Local>,
- ) -> impl Future<Output = ()> {
+ ) -> impl Future<Output = Result<()>> {
self.text.wait_for_edits(edit_ids)
}
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
- ) -> impl Future<Output = ()> {
+ ) -> impl Future<Output = Result<()>> {
self.text.wait_for_anchors(anchors)
}
- pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
+ pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = Result<()>> {
self.text.wait_for_version(version)
}
+ pub fn give_up_waiting(&mut self) {
+ self.text.give_up_waiting();
+ }
+
pub fn set_active_selections(
&mut self,
selections: Arc<[Selection<Anchor>]>,
@@ -220,7 +220,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
replica_id: undo.replica_id as ReplicaId,
value: undo.local_timestamp,
},
- version: deserialize_version(undo.version),
+ version: deserialize_version(&undo.version),
counts: undo
.counts
.into_iter()
@@ -294,7 +294,7 @@ pub fn deserialize_edit_operation(edit: proto::operation::Edit) -> EditOperation
local: edit.local_timestamp,
lamport: edit.lamport_timestamp,
},
- version: deserialize_version(edit.version),
+ version: deserialize_version(&edit.version),
ranges: edit.ranges.into_iter().map(deserialize_range).collect(),
new_text: edit.new_text.into_iter().map(Arc::from).collect(),
}
@@ -509,7 +509,7 @@ pub fn deserialize_transaction(transaction: proto::Transaction) -> Result<Transa
.into_iter()
.map(deserialize_local_timestamp)
.collect(),
- start: deserialize_version(transaction.start),
+ start: deserialize_version(&transaction.start),
})
}
@@ -538,7 +538,7 @@ pub fn deserialize_range(range: proto::Range) -> Range<FullOffset> {
FullOffset(range.start as usize)..FullOffset(range.end as usize)
}
-pub fn deserialize_version(message: Vec<proto::VectorClockEntry>) -> clock::Global {
+pub fn deserialize_version(message: &[proto::VectorClockEntry]) -> clock::Global {
let mut version = clock::Global::new();
for entry in message {
version.observe(clock::Local {
@@ -4,11 +4,13 @@ use crate::{
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use client::proto::{self, PeerId};
+use fs::LineEnding;
use gpui::{AppContext, AsyncAppContext, ModelHandle};
use language::{
point_from_lsp, point_to_lsp,
proto::{deserialize_anchor, deserialize_version, serialize_anchor, serialize_version},
- range_from_lsp, Anchor, Bias, Buffer, CachedLspAdapter, PointUtf16, ToPointUtf16,
+ range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CharKind, CodeAction,
+ Completion, OffsetRangeExt, PointUtf16, ToOffset, ToPointUtf16, Unclipped,
};
use lsp::{DocumentHighlightKind, LanguageServer, ServerCapabilities};
use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag};
@@ -27,6 +29,8 @@ pub(crate) trait LspCommand: 'static + Sized {
fn to_lsp(
&self,
path: &Path,
+ buffer: &Buffer,
+ language_server: &Arc<LanguageServer>,
cx: &AppContext,
) -> <Self::LspRequest as lsp::request::Request>::Params;
async fn response_from_lsp(
@@ -49,7 +53,7 @@ pub(crate) trait LspCommand: 'static + Sized {
project: &mut Project,
peer_id: PeerId,
buffer_version: &clock::Global,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> <Self::ProtoRequest as proto::RequestMessage>::Response;
async fn response_from_proto(
self,
@@ -91,6 +95,14 @@ pub(crate) struct GetHover {
pub position: PointUtf16,
}
+pub(crate) struct GetCompletions {
+ pub position: PointUtf16,
+}
+
+pub(crate) struct GetCodeActions {
+ pub range: Range<Anchor>,
+}
+
#[async_trait(?Send)]
impl LspCommand for PrepareRename {
type Response = Option<Range<Anchor>>;
@@ -105,7 +117,13 @@ impl LspCommand for PrepareRename {
}
}
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::TextDocumentPositionParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::TextDocumentPositionParams {
lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
uri: lsp::Url::from_file_path(path).unwrap(),
@@ -161,9 +179,9 @@ impl LspCommand for PrepareRename {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
@@ -175,7 +193,7 @@ impl LspCommand for PrepareRename {
_: &mut Project,
_: PeerId,
buffer_version: &clock::Global,
- _: &AppContext,
+ _: &mut AppContext,
) -> proto::PrepareRenameResponse {
proto::PrepareRenameResponse {
can_rename: range.is_some(),
@@ -199,9 +217,9 @@ impl LspCommand for PrepareRename {
if message.can_rename {
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
let start = message.start.and_then(deserialize_anchor);
let end = message.end.and_then(deserialize_anchor);
Ok(start.zip(end).map(|(start, end)| start..end))
@@ -221,7 +239,13 @@ impl LspCommand for PerformRename {
type LspRequest = lsp::request::Rename;
type ProtoRequest = proto::PerformRename;
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::RenameParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::RenameParams {
lsp::RenameParams {
text_document_position: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -281,9 +305,9 @@ impl LspCommand for PerformRename {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
new_name: message.new_name,
@@ -296,7 +320,7 @@ impl LspCommand for PerformRename {
project: &mut Project,
peer_id: PeerId,
_: &clock::Global,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> proto::PerformRenameResponse {
let transaction = project.serialize_project_transaction_for_peer(response, peer_id, cx);
proto::PerformRenameResponse {
@@ -332,7 +356,13 @@ impl LspCommand for GetDefinition {
type LspRequest = lsp::request::GotoDefinition;
type ProtoRequest = proto::GetDefinition;
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::GotoDefinitionParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::GotoDefinitionParams {
lsp::GotoDefinitionParams {
text_document_position_params: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -378,9 +408,9 @@ impl LspCommand for GetDefinition {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
@@ -391,7 +421,7 @@ impl LspCommand for GetDefinition {
project: &mut Project,
peer_id: PeerId,
_: &clock::Global,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> proto::GetDefinitionResponse {
let links = location_links_to_proto(response, project, peer_id, cx);
proto::GetDefinitionResponse { links }
@@ -418,7 +448,13 @@ impl LspCommand for GetTypeDefinition {
type LspRequest = lsp::request::GotoTypeDefinition;
type ProtoRequest = proto::GetTypeDefinition;
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::GotoTypeDefinitionParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::GotoTypeDefinitionParams {
lsp::GotoTypeDefinitionParams {
text_document_position_params: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -464,9 +500,9 @@ impl LspCommand for GetTypeDefinition {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
@@ -477,7 +513,7 @@ impl LspCommand for GetTypeDefinition {
project: &mut Project,
peer_id: PeerId,
_: &clock::Global,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> proto::GetTypeDefinitionResponse {
let links = location_links_to_proto(response, project, peer_id, cx);
proto::GetTypeDefinitionResponse { links }
@@ -537,7 +573,7 @@ async fn location_links_from_proto(
.ok_or_else(|| anyhow!("missing origin end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
- .await;
+ .await?;
Some(Location {
buffer,
range: start..end,
@@ -562,7 +598,7 @@ async fn location_links_from_proto(
.ok_or_else(|| anyhow!("missing target end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
- .await;
+ .await?;
let target = Location {
buffer,
range: start..end,
@@ -658,7 +694,7 @@ fn location_links_to_proto(
links: Vec<LocationLink>,
project: &mut Project,
peer_id: PeerId,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> Vec<proto::LocationLink> {
links
.into_iter()
@@ -693,7 +729,13 @@ impl LspCommand for GetReferences {
type LspRequest = lsp::request::References;
type ProtoRequest = proto::GetReferences;
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::ReferenceParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::ReferenceParams {
lsp::ReferenceParams {
text_document_position: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -774,9 +816,9 @@ impl LspCommand for GetReferences {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
@@ -787,7 +829,7 @@ impl LspCommand for GetReferences {
project: &mut Project,
peer_id: PeerId,
_: &clock::Global,
- cx: &AppContext,
+ cx: &mut AppContext,
) -> proto::GetReferencesResponse {
let locations = response
.into_iter()
@@ -827,7 +869,7 @@ impl LspCommand for GetReferences {
.ok_or_else(|| anyhow!("missing target end"))?;
target_buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
- .await;
+ .await?;
locations.push(Location {
buffer: target_buffer,
range: start..end,
@@ -851,7 +893,13 @@ impl LspCommand for GetDocumentHighlights {
capabilities.document_highlight_provider.is_some()
}
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::DocumentHighlightParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::DocumentHighlightParams {
lsp::DocumentHighlightParams {
text_document_position_params: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -915,9 +963,9 @@ impl LspCommand for GetDocumentHighlights {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
@@ -928,7 +976,7 @@ impl LspCommand for GetDocumentHighlights {
_: &mut Project,
_: PeerId,
_: &clock::Global,
- _: &AppContext,
+ _: &mut AppContext,
) -> proto::GetDocumentHighlightsResponse {
let highlights = response
.into_iter()
@@ -965,7 +1013,7 @@ impl LspCommand for GetDocumentHighlights {
.ok_or_else(|| anyhow!("missing target end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
- .await;
+ .await?;
let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) {
Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT,
Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ,
@@ -991,7 +1039,13 @@ impl LspCommand for GetHover {
type LspRequest = lsp::request::HoverRequest;
type ProtoRequest = proto::GetHover;
- fn to_lsp(&self, path: &Path, _: &AppContext) -> lsp::HoverParams {
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::HoverParams {
lsp::HoverParams {
text_document_position_params: lsp::TextDocumentPositionParams {
text_document: lsp::TextDocumentIdentifier {
@@ -1117,9 +1171,9 @@ impl LspCommand for GetHover {
.ok_or_else(|| anyhow!("invalid position"))?;
buffer
.update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(message.version))
+ buffer.wait_for_version(deserialize_version(&message.version))
})
- .await;
+ .await?;
Ok(Self {
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
@@ -1130,7 +1184,7 @@ impl LspCommand for GetHover {
_: &mut Project,
_: PeerId,
_: &clock::Global,
- _: &AppContext,
+ _: &mut AppContext,
) -> proto::GetHoverResponse {
if let Some(response) = response {
let (start, end) = if let Some(range) = response.range {
@@ -1199,3 +1253,342 @@ impl LspCommand for GetHover {
message.buffer_id
}
}
+
+#[async_trait(?Send)]
+impl LspCommand for GetCompletions {
+ type Response = Vec<Completion>;
+ type LspRequest = lsp::request::Completion;
+ type ProtoRequest = proto::GetCompletions;
+
+ fn to_lsp(
+ &self,
+ path: &Path,
+ _: &Buffer,
+ _: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::CompletionParams {
+ lsp::CompletionParams {
+ text_document_position: lsp::TextDocumentPositionParams::new(
+ lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(path).unwrap()),
+ point_to_lsp(self.position),
+ ),
+ context: Default::default(),
+ work_done_progress_params: Default::default(),
+ partial_result_params: Default::default(),
+ }
+ }
+
+ async fn response_from_lsp(
+ self,
+ completions: Option<lsp::CompletionResponse>,
+ _: ModelHandle<Project>,
+ buffer: ModelHandle<Buffer>,
+ cx: AsyncAppContext,
+ ) -> Result<Vec<Completion>> {
+ let completions = if let Some(completions) = completions {
+ match completions {
+ lsp::CompletionResponse::Array(completions) => completions,
+ lsp::CompletionResponse::List(list) => list.items,
+ }
+ } else {
+ Default::default()
+ };
+
+ let completions = buffer.read_with(&cx, |buffer, _| {
+ let language = buffer.language().cloned();
+ let snapshot = buffer.snapshot();
+ let clipped_position = buffer.clip_point_utf16(Unclipped(self.position), Bias::Left);
+ let mut range_for_token = None;
+ completions
+ .into_iter()
+ .filter_map(move |mut lsp_completion| {
+ // For now, we can only handle additional edits if they are returned
+ // when resolving the completion, not if they are present initially.
+ if lsp_completion
+ .additional_text_edits
+ .as_ref()
+ .map_or(false, |edits| !edits.is_empty())
+ {
+ return None;
+ }
+
+ let (old_range, mut new_text) = match lsp_completion.text_edit.as_ref() {
+ // If the language server provides a range to overwrite, then
+ // check that the range is valid.
+ Some(lsp::CompletionTextEdit::Edit(edit)) => {
+ let range = range_from_lsp(edit.range);
+ let start = snapshot.clip_point_utf16(range.start, Bias::Left);
+ let end = snapshot.clip_point_utf16(range.end, Bias::Left);
+ if start != range.start.0 || end != range.end.0 {
+ log::info!("completion out of expected range");
+ return None;
+ }
+ (
+ snapshot.anchor_before(start)..snapshot.anchor_after(end),
+ edit.new_text.clone(),
+ )
+ }
+ // If the language server does not provide a range, then infer
+ // the range based on the syntax tree.
+ None => {
+ if self.position != clipped_position {
+ log::info!("completion out of expected range");
+ return None;
+ }
+ let Range { start, end } = range_for_token
+ .get_or_insert_with(|| {
+ let offset = self.position.to_offset(&snapshot);
+ let (range, kind) = snapshot.surrounding_word(offset);
+ if kind == Some(CharKind::Word) {
+ range
+ } else {
+ offset..offset
+ }
+ })
+ .clone();
+ let text = lsp_completion
+ .insert_text
+ .as_ref()
+ .unwrap_or(&lsp_completion.label)
+ .clone();
+ (
+ snapshot.anchor_before(start)..snapshot.anchor_after(end),
+ text,
+ )
+ }
+ Some(lsp::CompletionTextEdit::InsertAndReplace(_)) => {
+ log::info!("unsupported insert/replace completion");
+ return None;
+ }
+ };
+
+ let language = language.clone();
+ LineEnding::normalize(&mut new_text);
+ Some(async move {
+ let mut label = None;
+ if let Some(language) = language {
+ language.process_completion(&mut lsp_completion).await;
+ label = language.label_for_completion(&lsp_completion).await;
+ }
+ Completion {
+ old_range,
+ new_text,
+ label: label.unwrap_or_else(|| {
+ language::CodeLabel::plain(
+ lsp_completion.label.clone(),
+ lsp_completion.filter_text.as_deref(),
+ )
+ }),
+ lsp_completion,
+ }
+ })
+ })
+ });
+
+ Ok(futures::future::join_all(completions).await)
+ }
+
+ fn to_proto(&self, project_id: u64, buffer: &Buffer) -> proto::GetCompletions {
+ let anchor = buffer.anchor_after(self.position);
+ proto::GetCompletions {
+ project_id,
+ buffer_id: buffer.remote_id(),
+ position: Some(language::proto::serialize_anchor(&anchor)),
+ version: serialize_version(&buffer.version()),
+ }
+ }
+
+ async fn from_proto(
+ message: proto::GetCompletions,
+ _: ModelHandle<Project>,
+ buffer: ModelHandle<Buffer>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Self> {
+ let version = deserialize_version(&message.version);
+ buffer
+ .update(&mut cx, |buffer, _| buffer.wait_for_version(version))
+ .await?;
+ let position = message
+ .position
+ .and_then(language::proto::deserialize_anchor)
+ .map(|p| {
+ buffer.read_with(&cx, |buffer, _| {
+ buffer.clip_point_utf16(Unclipped(p.to_point_utf16(buffer)), Bias::Left)
+ })
+ })
+ .ok_or_else(|| anyhow!("invalid position"))?;
+ Ok(Self { position })
+ }
+
+ fn response_to_proto(
+ completions: Vec<Completion>,
+ _: &mut Project,
+ _: PeerId,
+ buffer_version: &clock::Global,
+ _: &mut AppContext,
+ ) -> proto::GetCompletionsResponse {
+ proto::GetCompletionsResponse {
+ completions: completions
+ .iter()
+ .map(language::proto::serialize_completion)
+ .collect(),
+ version: serialize_version(&buffer_version),
+ }
+ }
+
+ async fn response_from_proto(
+ self,
+ message: proto::GetCompletionsResponse,
+ _: ModelHandle<Project>,
+ buffer: ModelHandle<Buffer>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Vec<Completion>> {
+ buffer
+ .update(&mut cx, |buffer, _| {
+ buffer.wait_for_version(deserialize_version(&message.version))
+ })
+ .await?;
+
+ let language = buffer.read_with(&cx, |buffer, _| buffer.language().cloned());
+ let completions = message.completions.into_iter().map(|completion| {
+ language::proto::deserialize_completion(completion, language.clone())
+ });
+ futures::future::try_join_all(completions).await
+ }
+
+ fn buffer_id_from_proto(message: &proto::GetCompletions) -> u64 {
+ message.buffer_id
+ }
+}
+
+#[async_trait(?Send)]
+impl LspCommand for GetCodeActions {
+ type Response = Vec<CodeAction>;
+ type LspRequest = lsp::request::CodeActionRequest;
+ type ProtoRequest = proto::GetCodeActions;
+
+ fn check_capabilities(&self, capabilities: &ServerCapabilities) -> bool {
+ capabilities.code_action_provider.is_some()
+ }
+
+ fn to_lsp(
+ &self,
+ path: &Path,
+ buffer: &Buffer,
+ language_server: &Arc<LanguageServer>,
+ _: &AppContext,
+ ) -> lsp::CodeActionParams {
+ let relevant_diagnostics = buffer
+ .snapshot()
+ .diagnostics_in_range::<_, usize>(self.range.clone(), false)
+ .map(|entry| entry.to_lsp_diagnostic_stub())
+ .collect();
+ lsp::CodeActionParams {
+ text_document: lsp::TextDocumentIdentifier::new(
+ lsp::Url::from_file_path(path).unwrap(),
+ ),
+ range: range_to_lsp(self.range.to_point_utf16(buffer)),
+ work_done_progress_params: Default::default(),
+ partial_result_params: Default::default(),
+ context: lsp::CodeActionContext {
+ diagnostics: relevant_diagnostics,
+ only: language_server.code_action_kinds(),
+ },
+ }
+ }
+
+ async fn response_from_lsp(
+ self,
+ actions: Option<lsp::CodeActionResponse>,
+ _: ModelHandle<Project>,
+ _: ModelHandle<Buffer>,
+ _: AsyncAppContext,
+ ) -> Result<Vec<CodeAction>> {
+ Ok(actions
+ .unwrap_or_default()
+ .into_iter()
+ .filter_map(|entry| {
+ if let lsp::CodeActionOrCommand::CodeAction(lsp_action) = entry {
+ Some(CodeAction {
+ range: self.range.clone(),
+ lsp_action,
+ })
+ } else {
+ None
+ }
+ })
+ .collect())
+ }
+
+ fn to_proto(&self, project_id: u64, buffer: &Buffer) -> proto::GetCodeActions {
+ proto::GetCodeActions {
+ project_id,
+ buffer_id: buffer.remote_id(),
+ start: Some(language::proto::serialize_anchor(&self.range.start)),
+ end: Some(language::proto::serialize_anchor(&self.range.end)),
+ version: serialize_version(&buffer.version()),
+ }
+ }
+
+ async fn from_proto(
+ message: proto::GetCodeActions,
+ _: ModelHandle<Project>,
+ buffer: ModelHandle<Buffer>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Self> {
+ let start = message
+ .start
+ .and_then(language::proto::deserialize_anchor)
+ .ok_or_else(|| anyhow!("invalid start"))?;
+ let end = message
+ .end
+ .and_then(language::proto::deserialize_anchor)
+ .ok_or_else(|| anyhow!("invalid end"))?;
+ buffer
+ .update(&mut cx, |buffer, _| {
+ buffer.wait_for_version(deserialize_version(&message.version))
+ })
+ .await?;
+
+ Ok(Self { range: start..end })
+ }
+
+ fn response_to_proto(
+ code_actions: Vec<CodeAction>,
+ _: &mut Project,
+ _: PeerId,
+ buffer_version: &clock::Global,
+ _: &mut AppContext,
+ ) -> proto::GetCodeActionsResponse {
+ proto::GetCodeActionsResponse {
+ actions: code_actions
+ .iter()
+ .map(language::proto::serialize_code_action)
+ .collect(),
+ version: serialize_version(&buffer_version),
+ }
+ }
+
+ async fn response_from_proto(
+ self,
+ message: proto::GetCodeActionsResponse,
+ _: ModelHandle<Project>,
+ buffer: ModelHandle<Buffer>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Vec<CodeAction>> {
+ buffer
+ .update(&mut cx, |buffer, _| {
+ buffer.wait_for_version(deserialize_version(&message.version))
+ })
+ .await?;
+ message
+ .actions
+ .into_iter()
+ .map(language::proto::deserialize_code_action)
+ .collect()
+ }
+
+ fn buffer_id_from_proto(message: &proto::GetCodeActions) -> u64 {
+ message.buffer_id
+ }
+}
@@ -13,7 +13,7 @@ use client::{proto, Client, TypedEnvelope, UserStore};
use clock::ReplicaId;
use collections::{hash_map, BTreeMap, HashMap, HashSet};
use futures::{
- channel::{mpsc, oneshot},
+ channel::mpsc::{self, UnboundedReceiver},
future::{try_join_all, Shared},
AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
};
@@ -27,11 +27,11 @@ use language::{
deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
serialize_anchor, serialize_version,
},
- range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CharKind, CodeAction,
- CodeLabel, Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent,
- File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt,
- Operation, Patch, PointUtf16, RopeFingerprint, TextBufferSnapshot, ToOffset, ToPointUtf16,
- Transaction, Unclipped,
+ range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
+ Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
+ Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt, Operation, Patch,
+ PointUtf16, RopeFingerprint, TextBufferSnapshot, ToOffset, ToPointUtf16, Transaction,
+ Unclipped,
};
use lsp::{
DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions,
@@ -92,6 +92,7 @@ pub trait Item {
pub struct Project {
worktrees: Vec<WorktreeHandle>,
active_entry: Option<ProjectEntryId>,
+ buffer_changes_tx: mpsc::UnboundedSender<BufferMessage>,
languages: Arc<LanguageRegistry>,
language_servers: HashMap<usize, LanguageServerState>,
language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
@@ -100,6 +101,7 @@ pub struct Project {
next_language_server_id: usize,
client: Arc<client::Client>,
next_entry_id: Arc<AtomicUsize>,
+ join_project_response_message_id: u32,
next_diagnostic_group_id: usize,
user_store: ModelHandle<UserStore>,
fs: Arc<dyn Fs>,
@@ -129,6 +131,22 @@ pub struct Project {
terminals: Terminals,
}
+enum BufferMessage {
+ Operation {
+ buffer_id: u64,
+ operation: proto::Operation,
+ },
+ Resync,
+}
+
+enum LocalProjectUpdate {
+ WorktreesChanged,
+ CreateBufferForPeer {
+ peer_id: proto::PeerId,
+ buffer_id: u64,
+ },
+}
+
enum OpenBuffer {
Strong(ModelHandle<Buffer>),
Weak(WeakModelHandle<Buffer>),
@@ -143,8 +161,8 @@ enum WorktreeHandle {
enum ProjectClientState {
Local {
remote_id: u64,
- metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
- _maintain_metadata: Task<()>,
+ updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
+ _send_updates: Task<()>,
},
Remote {
sharing_has_stopped: bool,
@@ -379,7 +397,7 @@ impl Project {
client.add_model_message_handler(Self::handle_unshare_project);
client.add_model_message_handler(Self::handle_create_buffer_for_peer);
client.add_model_message_handler(Self::handle_update_buffer_file);
- client.add_model_message_handler(Self::handle_update_buffer);
+ client.add_model_request_handler(Self::handle_update_buffer);
client.add_model_message_handler(Self::handle_update_diagnostic_summary);
client.add_model_message_handler(Self::handle_update_worktree);
client.add_model_request_handler(Self::handle_create_project_entry);
@@ -391,8 +409,8 @@ impl Project {
client.add_model_request_handler(Self::handle_reload_buffers);
client.add_model_request_handler(Self::handle_synchronize_buffers);
client.add_model_request_handler(Self::handle_format_buffers);
- client.add_model_request_handler(Self::handle_get_code_actions);
- client.add_model_request_handler(Self::handle_get_completions);
+ client.add_model_request_handler(Self::handle_lsp_command::<GetCodeActions>);
+ client.add_model_request_handler(Self::handle_lsp_command::<GetCompletions>);
client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
@@ -416,38 +434,45 @@ impl Project {
fs: Arc<dyn Fs>,
cx: &mut AppContext,
) -> ModelHandle<Self> {
- cx.add_model(|cx: &mut ModelContext<Self>| Self {
- worktrees: Default::default(),
- collaborators: Default::default(),
- opened_buffers: Default::default(),
- shared_buffers: Default::default(),
- incomplete_remote_buffers: Default::default(),
- loading_buffers_by_path: Default::default(),
- loading_local_worktrees: Default::default(),
- buffer_snapshots: Default::default(),
- client_state: None,
- opened_buffer: watch::channel(),
- client_subscriptions: Vec::new(),
- _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
- _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
- _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
- active_entry: None,
- languages,
- client,
- user_store,
- fs,
- next_entry_id: Default::default(),
- next_diagnostic_group_id: Default::default(),
- language_servers: Default::default(),
- language_server_ids: Default::default(),
- language_server_statuses: Default::default(),
- last_workspace_edits_by_language_server: Default::default(),
- buffers_being_formatted: Default::default(),
- next_language_server_id: 0,
- nonce: StdRng::from_entropy().gen(),
- terminals: Terminals {
- local_handles: Vec::new(),
- },
+ cx.add_model(|cx: &mut ModelContext<Self>| {
+ let (tx, rx) = mpsc::unbounded();
+ cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+ .detach();
+ Self {
+ worktrees: Default::default(),
+ buffer_changes_tx: tx,
+ collaborators: Default::default(),
+ opened_buffers: Default::default(),
+ shared_buffers: Default::default(),
+ incomplete_remote_buffers: Default::default(),
+ loading_buffers_by_path: Default::default(),
+ loading_local_worktrees: Default::default(),
+ buffer_snapshots: Default::default(),
+ join_project_response_message_id: 0,
+ client_state: None,
+ opened_buffer: watch::channel(),
+ client_subscriptions: Vec::new(),
+ _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
+ _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
+ _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
+ active_entry: None,
+ languages,
+ client,
+ user_store,
+ fs,
+ next_entry_id: Default::default(),
+ next_diagnostic_group_id: Default::default(),
+ language_servers: Default::default(),
+ language_server_ids: Default::default(),
+ language_server_statuses: Default::default(),
+ last_workspace_edits_by_language_server: Default::default(),
+ buffers_being_formatted: Default::default(),
+ next_language_server_id: 0,
+ nonce: StdRng::from_entropy().gen(),
+ terminals: Terminals {
+ local_handles: Vec::new(),
+ },
+ }
})
}
@@ -461,25 +486,29 @@ impl Project {
) -> Result<ModelHandle<Self>> {
client.authenticate_and_connect(true, &cx).await?;
- let subscription = client.subscribe_to_entity(remote_id);
+ let subscription = client.subscribe_to_entity(remote_id)?;
let response = client
- .request(proto::JoinProject {
+ .request_envelope(proto::JoinProject {
project_id: remote_id,
})
.await?;
let this = cx.add_model(|cx| {
- let replica_id = response.replica_id as ReplicaId;
+ let replica_id = response.payload.replica_id as ReplicaId;
let mut worktrees = Vec::new();
- for worktree in response.worktrees {
+ for worktree in response.payload.worktrees {
let worktree = cx.update(|cx| {
Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
});
worktrees.push(worktree);
}
+ let (tx, rx) = mpsc::unbounded();
+ cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+ .detach();
let mut this = Self {
worktrees: Vec::new(),
+ buffer_changes_tx: tx,
loading_buffers_by_path: Default::default(),
opened_buffer: watch::channel(),
shared_buffers: Default::default(),
@@ -487,6 +516,7 @@ impl Project {
loading_local_worktrees: Default::default(),
active_entry: None,
collaborators: Default::default(),
+ join_project_response_message_id: response.message_id,
_maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
_maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
languages,
@@ -505,6 +535,7 @@ impl Project {
language_servers: Default::default(),
language_server_ids: Default::default(),
language_server_statuses: response
+ .payload
.language_servers
.into_iter()
.map(|server| {
@@ -537,6 +568,7 @@ impl Project {
let subscription = subscription.set_model(&this, &mut cx);
let user_ids = response
+ .payload
.collaborators
.iter()
.map(|peer| peer.user_id)
@@ -546,7 +578,7 @@ impl Project {
.await?;
this.update(&mut cx, |this, cx| {
- this.set_collaborators_from_proto(response.collaborators, cx)?;
+ this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
this.client_subscriptions.push(subscription);
anyhow::Ok(())
})?;
@@ -654,37 +686,11 @@ impl Project {
}
#[cfg(any(test, feature = "test-support"))]
- pub fn check_invariants(&self, cx: &AppContext) {
- if self.is_local() {
- let mut worktree_root_paths = HashMap::default();
- for worktree in self.worktrees(cx) {
- let worktree = worktree.read(cx);
- let abs_path = worktree.as_local().unwrap().abs_path().clone();
- let prev_worktree_id = worktree_root_paths.insert(abs_path.clone(), worktree.id());
- assert_eq!(
- prev_worktree_id,
- None,
- "abs path {:?} for worktree {:?} is not unique ({:?} was already registered with the same path)",
- abs_path,
- worktree.id(),
- prev_worktree_id
- )
- }
- } else {
- let replica_id = self.replica_id();
- for buffer in self.opened_buffers.values() {
- if let Some(buffer) = buffer.upgrade(cx) {
- let buffer = buffer.read(cx);
- assert_eq!(
- buffer.deferred_ops_len(),
- 0,
- "replica {}, buffer {} has deferred operations",
- replica_id,
- buffer.remote_id()
- );
- }
- }
- }
+ pub fn opened_buffers(&self, cx: &AppContext) -> Vec<ModelHandle<Buffer>> {
+ self.opened_buffers
+ .values()
+ .filter_map(|b| b.upgrade(cx))
+ .collect()
}
#[cfg(any(test, feature = "test-support"))]
@@ -724,22 +730,13 @@ impl Project {
}
}
- fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
- let (tx, rx) = oneshot::channel();
- if let Some(ProjectClientState::Local {
- metadata_changed, ..
- }) = &mut self.client_state
- {
- let _ = metadata_changed.unbounded_send(tx);
+ fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
+ if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
+ updates_tx
+ .unbounded_send(LocalProjectUpdate::WorktreesChanged)
+ .ok();
}
cx.notify();
-
- async move {
- // If the project is shared, this will resolve when the `_maintain_metadata` task has
- // a chance to update the metadata. Otherwise, it will resolve right away because `tx`
- // will get dropped.
- let _ = rx.await;
- }
}
pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
@@ -984,6 +981,11 @@ impl Project {
if self.client_state.is_some() {
return Err(anyhow!("project was already shared"));
}
+ self.client_subscriptions.push(
+ self.client
+ .subscribe_to_entity(project_id)?
+ .set_model(&cx.handle(), &mut cx.to_async()),
+ );
for open_buffer in self.opened_buffers.values_mut() {
match open_buffer {
@@ -1020,52 +1022,96 @@ impl Project {
.log_err();
}
- self.client_subscriptions.push(
- self.client
- .subscribe_to_entity(project_id)
- .set_model(&cx.handle(), &mut cx.to_async()),
- );
-
- let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
+ let (updates_tx, mut updates_rx) = mpsc::unbounded();
+ let client = self.client.clone();
self.client_state = Some(ProjectClientState::Local {
remote_id: project_id,
- metadata_changed: metadata_changed_tx,
- _maintain_metadata: cx.spawn_weak(move |this, mut cx| async move {
- let mut txs = Vec::new();
- while let Some(tx) = metadata_changed_rx.next().await {
- txs.push(tx);
- while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
- txs.push(next_tx);
- }
-
+ updates_tx,
+ _send_updates: cx.spawn_weak(move |this, mut cx| async move {
+ while let Some(update) = updates_rx.next().await {
let Some(this) = this.upgrade(&cx) else { break };
- let worktrees =
- this.read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
- let update_project = this
- .read_with(&cx, |this, cx| {
- this.client.request(proto::UpdateProject {
- project_id,
- worktrees: this.worktree_metadata_protos(cx),
- })
- })
- .await;
- if update_project.is_ok() {
- for worktree in worktrees {
- worktree.update(&mut cx, |worktree, cx| {
- let worktree = worktree.as_local_mut().unwrap();
- worktree.share(project_id, cx).detach_and_log_err(cx)
- });
+
+ match update {
+ LocalProjectUpdate::WorktreesChanged => {
+ let worktrees = this
+ .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
+ let update_project = this
+ .read_with(&cx, |this, cx| {
+ this.client.request(proto::UpdateProject {
+ project_id,
+ worktrees: this.worktree_metadata_protos(cx),
+ })
+ })
+ .await;
+ if update_project.is_ok() {
+ for worktree in worktrees {
+ worktree.update(&mut cx, |worktree, cx| {
+ let worktree = worktree.as_local_mut().unwrap();
+ worktree.share(project_id, cx).detach_and_log_err(cx)
+ });
+ }
+ }
}
- }
+ LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
+ let buffer = this.update(&mut cx, |this, _| {
+ let buffer = this.opened_buffers.get(&buffer_id).unwrap();
+ let shared_buffers =
+ this.shared_buffers.entry(peer_id).or_default();
+ if shared_buffers.insert(buffer_id) {
+ if let OpenBuffer::Strong(buffer) = buffer {
+ Some(buffer.clone())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ });
+
+ let Some(buffer) = buffer else { continue };
+ let operations =
+ buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
+ let operations = operations.await;
+ let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
- for tx in txs.drain(..) {
- let _ = tx.send(());
+ let initial_state = proto::CreateBufferForPeer {
+ project_id,
+ peer_id: Some(peer_id),
+ variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+ };
+ if client.send(initial_state).log_err().is_some() {
+ let client = client.clone();
+ cx.background()
+ .spawn(async move {
+ let mut chunks = split_operations(operations).peekable();
+ while let Some(chunk) = chunks.next() {
+ let is_last = chunks.peek().is_none();
+ client.send(proto::CreateBufferForPeer {
+ project_id,
+ peer_id: Some(peer_id),
+ variant: Some(
+ proto::create_buffer_for_peer::Variant::Chunk(
+ proto::BufferChunk {
+ buffer_id,
+ operations: chunk,
+ is_last,
+ },
+ ),
+ ),
+ })?;
+ }
+ anyhow::Ok(())
+ })
+ .await
+ .log_err();
+ }
+ }
}
}
}),
});
- let _ = self.metadata_changed(cx);
+ self.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(project_id)));
cx.notify();
Ok(())
@@ -1076,16 +1122,19 @@ impl Project {
message: proto::ResharedProject,
cx: &mut ModelContext<Self>,
) -> Result<()> {
+ self.shared_buffers.clear();
self.set_collaborators_from_proto(message.collaborators, cx)?;
- let _ = self.metadata_changed(cx);
+ self.metadata_changed(cx);
Ok(())
}
pub fn rejoined(
&mut self,
message: proto::RejoinedProject,
+ message_id: u32,
cx: &mut ModelContext<Self>,
) -> Result<()> {
+ self.join_project_response_message_id = message_id;
self.set_worktrees_from_proto(message.worktrees, cx)?;
self.set_collaborators_from_proto(message.collaborators, cx)?;
self.language_server_statuses = message
@@ -1103,13 +1152,21 @@ impl Project {
)
})
.collect();
- self.synchronize_remote_buffers(cx).detach_and_log_err(cx);
-
+ self.buffer_changes_tx
+ .unbounded_send(BufferMessage::Resync)
+ .unwrap();
cx.notify();
Ok(())
}
pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
+ self.unshare_internal(cx)?;
+ self.metadata_changed(cx);
+ cx.notify();
+ Ok(())
+ }
+
+ fn unshare_internal(&mut self, cx: &mut AppContext) -> Result<()> {
if self.is_remote() {
return Err(anyhow!("attempted to unshare a remote project"));
}
@@ -1132,13 +1189,16 @@ impl Project {
}
for open_buffer in self.opened_buffers.values_mut() {
+ // Wake up any tasks waiting for peers' edits to this buffer.
+ if let Some(buffer) = open_buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, _| buffer.give_up_waiting());
+ }
+
if let OpenBuffer::Strong(buffer) = open_buffer {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
}
}
- let _ = self.metadata_changed(cx);
- cx.notify();
self.client.send(proto::UnshareProject {
project_id: remote_id,
})?;
@@ -1150,13 +1210,21 @@ impl Project {
}
pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
+ self.disconnected_from_host_internal(cx);
+ cx.emit(Event::DisconnectedFromHost);
+ cx.notify();
+ }
+
+ fn disconnected_from_host_internal(&mut self, cx: &mut AppContext) {
if let Some(ProjectClientState::Remote {
sharing_has_stopped,
..
}) = &mut self.client_state
{
*sharing_has_stopped = true;
+
self.collaborators.clear();
+
for worktree in &self.worktrees {
if let Some(worktree) = worktree.upgrade(cx) {
worktree.update(cx, |worktree, _| {
@@ -1166,8 +1234,17 @@ impl Project {
});
}
}
- cx.emit(Event::DisconnectedFromHost);
- cx.notify();
+
+ for open_buffer in self.opened_buffers.values_mut() {
+ // Wake up any tasks waiting for peers' edits to this buffer.
+ if let Some(buffer) = open_buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, _| buffer.give_up_waiting());
+ }
+
+ if let OpenBuffer::Strong(buffer) = open_buffer {
+ *open_buffer = OpenBuffer::Weak(buffer.downgrade());
+ }
+ }
// Wake up all futures currently waiting on a buffer to get opened,
// to give them a chance to fail now that we've disconnected.
@@ -1507,32 +1584,29 @@ impl Project {
});
let remote_id = buffer.read(cx).remote_id();
- let open_buffer = if self.is_remote() || self.is_shared() {
+ let is_remote = self.is_remote();
+ let open_buffer = if is_remote || self.is_shared() {
OpenBuffer::Strong(buffer.clone())
} else {
OpenBuffer::Weak(buffer.downgrade())
};
- match self.opened_buffers.insert(remote_id, open_buffer) {
- None => {}
- Some(OpenBuffer::Operations(operations)) => {
- buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
+ match self.opened_buffers.entry(remote_id) {
+ hash_map::Entry::Vacant(entry) => {
+ entry.insert(open_buffer);
}
- Some(OpenBuffer::Weak(existing_handle)) => {
- if existing_handle.upgrade(cx).is_some() {
- debug_panic!("already registered buffer with remote id {}", remote_id);
- Err(anyhow!(
- "already registered buffer with remote id {}",
- remote_id
- ))?
+ hash_map::Entry::Occupied(mut entry) => {
+ if let OpenBuffer::Operations(operations) = entry.get_mut() {
+ buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
+ } else if entry.get().upgrade(cx).is_some() {
+ if is_remote {
+ return Ok(());
+ } else {
+ debug_panic!("buffer {} was already registered", remote_id);
+ Err(anyhow!("buffer {} was already registered", remote_id))?;
+ }
}
- }
- Some(OpenBuffer::Strong(_)) => {
- debug_panic!("already registered buffer with remote id {}", remote_id);
- Err(anyhow!(
- "already registered buffer with remote id {}",
- remote_id
- ))?
+ entry.insert(open_buffer);
}
}
cx.subscribe(buffer, |this, buffer, event, cx| {
@@ -1657,6 +1731,53 @@ impl Project {
});
}
+ async fn send_buffer_messages(
+ this: WeakModelHandle<Self>,
+ mut rx: UnboundedReceiver<BufferMessage>,
+ mut cx: AsyncAppContext,
+ ) {
+ let mut needs_resync_with_host = false;
+ while let Some(change) = rx.next().await {
+ if let Some(this) = this.upgrade(&mut cx) {
+ let is_local = this.read_with(&cx, |this, _| this.is_local());
+ match change {
+ BufferMessage::Operation {
+ buffer_id,
+ operation,
+ } => {
+ if needs_resync_with_host {
+ continue;
+ }
+ let request = this.read_with(&cx, |this, _| {
+ let project_id = this.remote_id()?;
+ Some(this.client.request(proto::UpdateBuffer {
+ buffer_id,
+ project_id,
+ operations: vec![operation],
+ }))
+ });
+ if let Some(request) = request {
+ if request.await.is_err() && !is_local {
+ needs_resync_with_host = true;
+ }
+ }
+ }
+ BufferMessage::Resync => {
+ if this
+ .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
+ .await
+ .is_ok()
+ {
+ needs_resync_with_host = false;
+ }
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
fn on_buffer_event(
&mut self,
buffer: ModelHandle<Buffer>,
@@ -1665,14 +1786,12 @@ impl Project {
) -> Option<()> {
match event {
BufferEvent::Operation(operation) => {
- if let Some(project_id) = self.remote_id() {
- let request = self.client.request(proto::UpdateBuffer {
- project_id,
+ self.buffer_changes_tx
+ .unbounded_send(BufferMessage::Operation {
buffer_id: buffer.read(cx).remote_id(),
- operations: vec![language::proto::serialize_operation(operation)],
- });
- cx.background().spawn(request).detach_and_log_err(cx);
- }
+ operation: language::proto::serialize_operation(operation),
+ })
+ .ok();
}
BufferEvent::Edited { .. } => {
let language_server = self
@@ -3477,188 +3596,12 @@ impl Project {
pub fn completions<T: ToPointUtf16>(
&self,
- source_buffer_handle: &ModelHandle<Buffer>,
+ buffer: &ModelHandle<Buffer>,
position: T,
cx: &mut ModelContext<Self>,
) -> Task<Result<Vec<Completion>>> {
- let source_buffer_handle = source_buffer_handle.clone();
- let source_buffer = source_buffer_handle.read(cx);
- let buffer_id = source_buffer.remote_id();
- let language = source_buffer.language().cloned();
- let worktree;
- let buffer_abs_path;
- if let Some(file) = File::from_dyn(source_buffer.file()) {
- worktree = file.worktree.clone();
- buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
- } else {
- return Task::ready(Ok(Default::default()));
- };
-
- let position = Unclipped(position.to_point_utf16(source_buffer));
- let anchor = source_buffer.anchor_after(position);
-
- if worktree.read(cx).as_local().is_some() {
- let buffer_abs_path = buffer_abs_path.unwrap();
- let lang_server =
- if let Some((_, server)) = self.language_server_for_buffer(source_buffer, cx) {
- server.clone()
- } else {
- return Task::ready(Ok(Default::default()));
- };
-
- cx.spawn(|_, cx| async move {
- let completions = lang_server
- .request::<lsp::request::Completion>(lsp::CompletionParams {
- text_document_position: lsp::TextDocumentPositionParams::new(
- lsp::TextDocumentIdentifier::new(
- lsp::Url::from_file_path(buffer_abs_path).unwrap(),
- ),
- point_to_lsp(position.0),
- ),
- context: Default::default(),
- work_done_progress_params: Default::default(),
- partial_result_params: Default::default(),
- })
- .await
- .context("lsp completion request failed")?;
-
- let completions = if let Some(completions) = completions {
- match completions {
- lsp::CompletionResponse::Array(completions) => completions,
- lsp::CompletionResponse::List(list) => list.items,
- }
- } else {
- Default::default()
- };
-
- let completions = source_buffer_handle.read_with(&cx, |this, _| {
- let snapshot = this.snapshot();
- let clipped_position = this.clip_point_utf16(position, Bias::Left);
- let mut range_for_token = None;
- completions
- .into_iter()
- .filter_map(move |mut lsp_completion| {
- // For now, we can only handle additional edits if they are returned
- // when resolving the completion, not if they are present initially.
- if lsp_completion
- .additional_text_edits
- .as_ref()
- .map_or(false, |edits| !edits.is_empty())
- {
- return None;
- }
-
- let (old_range, mut new_text) = match lsp_completion.text_edit.as_ref()
- {
- // If the language server provides a range to overwrite, then
- // check that the range is valid.
- Some(lsp::CompletionTextEdit::Edit(edit)) => {
- let range = range_from_lsp(edit.range);
- let start = snapshot.clip_point_utf16(range.start, Bias::Left);
- let end = snapshot.clip_point_utf16(range.end, Bias::Left);
- if start != range.start.0 || end != range.end.0 {
- log::info!("completion out of expected range");
- return None;
- }
- (
- snapshot.anchor_before(start)..snapshot.anchor_after(end),
- edit.new_text.clone(),
- )
- }
- // If the language server does not provide a range, then infer
- // the range based on the syntax tree.
- None => {
- if position.0 != clipped_position {
- log::info!("completion out of expected range");
- return None;
- }
- let Range { start, end } = range_for_token
- .get_or_insert_with(|| {
- let offset = position.to_offset(&snapshot);
- let (range, kind) = snapshot.surrounding_word(offset);
- if kind == Some(CharKind::Word) {
- range
- } else {
- offset..offset
- }
- })
- .clone();
- let text = lsp_completion
- .insert_text
- .as_ref()
- .unwrap_or(&lsp_completion.label)
- .clone();
- (
- snapshot.anchor_before(start)..snapshot.anchor_after(end),
- text,
- )
- }
- Some(lsp::CompletionTextEdit::InsertAndReplace(_)) => {
- log::info!("unsupported insert/replace completion");
- return None;
- }
- };
-
- LineEnding::normalize(&mut new_text);
- let language = language.clone();
- Some(async move {
- let mut label = None;
- if let Some(language) = language {
- language.process_completion(&mut lsp_completion).await;
- label = language.label_for_completion(&lsp_completion).await;
- }
- Completion {
- old_range,
- new_text,
- label: label.unwrap_or_else(|| {
- CodeLabel::plain(
- lsp_completion.label.clone(),
- lsp_completion.filter_text.as_deref(),
- )
- }),
- lsp_completion,
- }
- })
- })
- });
-
- Ok(futures::future::join_all(completions).await)
- })
- } else if let Some(project_id) = self.remote_id() {
- let rpc = self.client.clone();
- let message = proto::GetCompletions {
- project_id,
- buffer_id,
- position: Some(language::proto::serialize_anchor(&anchor)),
- version: serialize_version(&source_buffer.version()),
- };
- cx.spawn_weak(|this, mut cx| async move {
- let response = rpc.request(message).await?;
-
- if this
- .upgrade(&cx)
- .ok_or_else(|| anyhow!("project was dropped"))?
- .read_with(&cx, |this, _| this.is_read_only())
- {
- return Err(anyhow!(
- "failed to get completions: project was disconnected"
- ));
- } else {
- source_buffer_handle
- .update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(response.version))
- })
- .await;
-
- let completions = response.completions.into_iter().map(|completion| {
- language::proto::deserialize_completion(completion, language.clone())
- });
- futures::future::try_join_all(completions).await
- }
- })
- } else {
- Task::ready(Ok(Default::default()))
- }
+ let position = position.to_point_utf16(buffer.read(cx));
+ self.request_lsp(buffer.clone(), GetCompletions { position }, cx)
}
pub fn apply_additional_edits_for_completion(
@@ -3739,7 +3682,7 @@ impl Project {
.update(&mut cx, |buffer, _| {
buffer.wait_for_edits(transaction.edit_ids.iter().copied())
})
- .await;
+ .await?;
if push_to_history {
buffer_handle.update(&mut cx, |buffer, _| {
buffer.push_transaction(transaction.clone(), Instant::now());
@@ -3761,106 +3704,9 @@ impl Project {
range: Range<T>,
cx: &mut ModelContext<Self>,
) -> Task<Result<Vec<CodeAction>>> {
- let buffer_handle = buffer_handle.clone();
let buffer = buffer_handle.read(cx);
- let snapshot = buffer.snapshot();
- let relevant_diagnostics = snapshot
- .diagnostics_in_range::<usize, usize>(range.to_offset(&snapshot), false)
- .map(|entry| entry.to_lsp_diagnostic_stub())
- .collect();
- let buffer_id = buffer.remote_id();
- let worktree;
- let buffer_abs_path;
- if let Some(file) = File::from_dyn(buffer.file()) {
- worktree = file.worktree.clone();
- buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
- } else {
- return Task::ready(Ok(Vec::new()));
- };
let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
-
- if worktree.read(cx).as_local().is_some() {
- let buffer_abs_path = buffer_abs_path.unwrap();
- let lang_server = if let Some((_, server)) = self.language_server_for_buffer(buffer, cx)
- {
- server.clone()
- } else {
- return Task::ready(Ok(Vec::new()));
- };
-
- let lsp_range = range_to_lsp(range.to_point_utf16(buffer));
- cx.foreground().spawn(async move {
- if lang_server.capabilities().code_action_provider.is_none() {
- return Ok(Vec::new());
- }
-
- Ok(lang_server
- .request::<lsp::request::CodeActionRequest>(lsp::CodeActionParams {
- text_document: lsp::TextDocumentIdentifier::new(
- lsp::Url::from_file_path(buffer_abs_path).unwrap(),
- ),
- range: lsp_range,
- work_done_progress_params: Default::default(),
- partial_result_params: Default::default(),
- context: lsp::CodeActionContext {
- diagnostics: relevant_diagnostics,
- only: lang_server.code_action_kinds(),
- },
- })
- .await?
- .unwrap_or_default()
- .into_iter()
- .filter_map(|entry| {
- if let lsp::CodeActionOrCommand::CodeAction(lsp_action) = entry {
- Some(CodeAction {
- range: range.clone(),
- lsp_action,
- })
- } else {
- None
- }
- })
- .collect())
- })
- } else if let Some(project_id) = self.remote_id() {
- let rpc = self.client.clone();
- let version = buffer.version();
- cx.spawn_weak(|this, mut cx| async move {
- let response = rpc
- .request(proto::GetCodeActions {
- project_id,
- buffer_id,
- start: Some(language::proto::serialize_anchor(&range.start)),
- end: Some(language::proto::serialize_anchor(&range.end)),
- version: serialize_version(&version),
- })
- .await?;
-
- if this
- .upgrade(&cx)
- .ok_or_else(|| anyhow!("project was dropped"))?
- .read_with(&cx, |this, _| this.is_read_only())
- {
- return Err(anyhow!(
- "failed to get code actions: project was disconnected"
- ));
- } else {
- buffer_handle
- .update(&mut cx, |buffer, _| {
- buffer.wait_for_version(deserialize_version(response.version))
- })
- .await;
-
- response
- .actions
- .into_iter()
- .map(language::proto::deserialize_code_action)
- .collect()
- }
- })
- } else {
- Task::ready(Ok(Default::default()))
- }
+ self.request_lsp(buffer_handle.clone(), GetCodeActions { range }, cx)
}
pub fn apply_code_action(
@@ -4345,7 +4191,7 @@ impl Project {
self.language_server_for_buffer(buffer, cx)
.map(|(_, server)| server.clone()),
) {
- let lsp_params = request.to_lsp(&file.abs_path(cx), cx);
+ let lsp_params = request.to_lsp(&file.abs_path(cx), buffer, &language_server, cx);
return cx.spawn(|this, cx| async move {
if !request.check_capabilities(language_server.capabilities()) {
return Ok(Default::default());
@@ -221,7 +221,7 @@ impl Worktree {
root_char_bag: root_name.chars().map(|c| c.to_ascii_lowercase()).collect(),
entries_by_path: Default::default(),
entries_by_id: Default::default(),
- scan_id: 0,
+ scan_id: 1,
completed_scan_id: 0,
},
};
@@ -298,7 +298,7 @@ impl Worktree {
.collect(),
entries_by_path: Default::default(),
entries_by_id: Default::default(),
- scan_id: 0,
+ scan_id: 1,
completed_scan_id: 0,
};
@@ -1063,7 +1063,7 @@ impl RemoteWorktree {
version: serialize_version(&version),
})
.await?;
- let version = deserialize_version(response.version);
+ let version = deserialize_version(&response.version);
let fingerprint = deserialize_fingerprint(&response.fingerprint)?;
let mtime = response
.mtime
@@ -1223,11 +1223,10 @@ impl Snapshot {
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
for entry_id in update.removed_entries {
- let entry = self
- .entry_for_id(ProjectEntryId::from_proto(entry_id))
- .ok_or_else(|| anyhow!("unknown entry {}", entry_id))?;
- entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
- entries_by_id_edits.push(Edit::Remove(entry.id));
+ if let Some(entry) = self.entry_for_id(ProjectEntryId::from_proto(entry_id)) {
+ entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
+ entries_by_id_edits.push(Edit::Remove(entry.id));
+ }
}
for entry in update.updated_entries {
@@ -3725,7 +3724,7 @@ mod tests {
) {
let mut files = Vec::new();
let mut dirs = Vec::new();
- for path in fs.as_fake().paths().await {
+ for path in fs.as_fake().paths() {
if path.starts_with(root_path) {
if fs.is_file(&path).await {
files.push(path);
@@ -7,7 +7,7 @@ use collections::HashMap;
use futures::{
channel::{mpsc, oneshot},
stream::BoxStream,
- FutureExt, SinkExt, StreamExt,
+ FutureExt, SinkExt, StreamExt, TryFutureExt,
};
use parking_lot::{Mutex, RwLock};
use serde::{ser::SerializeStruct, Serialize};
@@ -71,6 +71,7 @@ impl<T> Clone for Receipt<T> {
impl<T> Copy for Receipt<T> {}
+#[derive(Clone, Debug)]
pub struct TypedEnvelope<T> {
pub sender_id: ConnectionId,
pub original_sender_id: Option<PeerId>,
@@ -370,6 +371,15 @@ impl Peer {
receiver_id: ConnectionId,
request: T,
) -> impl Future<Output = Result<T::Response>> {
+ self.request_internal(None, receiver_id, request)
+ .map_ok(|envelope| envelope.payload)
+ }
+
+ pub fn request_envelope<T: RequestMessage>(
+ &self,
+ receiver_id: ConnectionId,
+ request: T,
+ ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
self.request_internal(None, receiver_id, request)
}
@@ -380,6 +390,7 @@ impl Peer {
request: T,
) -> impl Future<Output = Result<T::Response>> {
self.request_internal(Some(sender_id), receiver_id, request)
+ .map_ok(|envelope| envelope.payload)
}
pub fn request_internal<T: RequestMessage>(
@@ -387,7 +398,7 @@ impl Peer {
original_sender_id: Option<ConnectionId>,
receiver_id: ConnectionId,
request: T,
- ) -> impl Future<Output = Result<T::Response>> {
+ ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
let (tx, rx) = oneshot::channel();
let send = self.connection_state(receiver_id).and_then(|connection| {
let message_id = connection.next_message_id.fetch_add(1, SeqCst);
@@ -410,6 +421,7 @@ impl Peer {
async move {
send?;
let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
+
if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
Err(anyhow!(
"RPC request {} failed - {}",
@@ -417,8 +429,13 @@ impl Peer {
error.message
))
} else {
- T::Response::from_envelope(response)
- .ok_or_else(|| anyhow!("received response of the wrong type"))
+ Ok(TypedEnvelope {
+ message_id: response.id,
+ sender_id: receiver_id,
+ original_sender_id: response.original_sender_id,
+ payload: T::Response::from_envelope(response)
+ .ok_or_else(|| anyhow!("received response of the wrong type"))?,
+ })
}
}
}
@@ -233,7 +233,7 @@ messages!(
(UpdateProject, Foreground),
(UpdateProjectCollaborator, Foreground),
(UpdateWorktree, Foreground),
- (UpdateDiffBase, Background),
+ (UpdateDiffBase, Foreground),
(GetPrivateUserInfo, Foreground),
(GetPrivateUserInfoResponse, Foreground),
);
@@ -6,4 +6,4 @@ pub use conn::Connection;
pub use peer::*;
mod macros;
-pub const PROTOCOL_VERSION: u32 = 50;
+pub const PROTOCOL_VERSION: u32 = 51;
@@ -11,14 +11,14 @@ mod tests;
mod undo_map;
pub use anchor::*;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use clock::ReplicaId;
use collections::{HashMap, HashSet};
use fs::LineEnding;
use locator::Locator;
use operation_queue::OperationQueue;
pub use patch::Patch;
-use postage::{barrier, oneshot, prelude::*};
+use postage::{oneshot, prelude::*};
pub use rope::*;
pub use selection::*;
@@ -52,7 +52,7 @@ pub struct Buffer {
pub lamport_clock: clock::Lamport,
subscriptions: Topic,
edit_id_resolvers: HashMap<clock::Local, Vec<oneshot::Sender<()>>>,
- version_barriers: Vec<(clock::Global, barrier::Sender)>,
+ wait_for_version_txs: Vec<(clock::Global, oneshot::Sender<()>)>,
}
#[derive(Clone)]
@@ -522,7 +522,7 @@ impl Buffer {
lamport_clock,
subscriptions: Default::default(),
edit_id_resolvers: Default::default(),
- version_barriers: Default::default(),
+ wait_for_version_txs: Default::default(),
}
}
@@ -793,8 +793,14 @@ impl Buffer {
}
}
}
- self.version_barriers
- .retain(|(version, _)| !self.snapshot.version().observed_all(version));
+ self.wait_for_version_txs.retain_mut(|(version, tx)| {
+ if self.snapshot.version().observed_all(version) {
+ tx.try_send(()).ok();
+ false
+ } else {
+ true
+ }
+ });
Ok(())
}
@@ -1305,7 +1311,7 @@ impl Buffer {
pub fn wait_for_edits(
&mut self,
edit_ids: impl IntoIterator<Item = clock::Local>,
- ) -> impl 'static + Future<Output = ()> {
+ ) -> impl 'static + Future<Output = Result<()>> {
let mut futures = Vec::new();
for edit_id in edit_ids {
if !self.version.observed(edit_id) {
@@ -1317,15 +1323,18 @@ impl Buffer {
async move {
for mut future in futures {
- future.recv().await;
+ if future.recv().await.is_none() {
+ Err(anyhow!("gave up waiting for edits"))?;
+ }
}
+ Ok(())
}
}
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
- ) -> impl 'static + Future<Output = ()> {
+ ) -> impl 'static + Future<Output = Result<()>> {
let mut futures = Vec::new();
for anchor in anchors {
if !self.version.observed(anchor.timestamp)
@@ -1343,21 +1352,36 @@ impl Buffer {
async move {
for mut future in futures {
- future.recv().await;
+ if future.recv().await.is_none() {
+ Err(anyhow!("gave up waiting for anchors"))?;
+ }
}
+ Ok(())
}
}
- pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
- let (tx, mut rx) = barrier::channel();
+ pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = Result<()>> {
+ let mut rx = None;
if !self.snapshot.version.observed_all(&version) {
- self.version_barriers.push((version, tx));
+ let channel = oneshot::channel();
+ self.wait_for_version_txs.push((version, channel.0));
+ rx = Some(channel.1);
}
async move {
- rx.recv().await;
+ if let Some(mut rx) = rx {
+ if rx.recv().await.is_none() {
+ Err(anyhow!("gave up waiting for version"))?;
+ }
+ }
+ Ok(())
}
}
+ pub fn give_up_waiting(&mut self) {
+ self.edit_id_resolvers.clear();
+ self.wait_for_version_txs.clear();
+ }
+
fn resolve_edit(&mut self, edit_id: clock::Local) {
for mut tx in self
.edit_id_resolvers
@@ -1365,7 +1389,7 @@ impl Buffer {
.into_iter()
.flatten()
{
- let _ = tx.try_send(());
+ tx.try_send(()).ok();
}
}
}
@@ -1480,12 +1504,11 @@ impl Buffer {
start..end
}
- #[allow(clippy::type_complexity)]
- pub fn randomly_edit<T>(
- &mut self,
+ pub fn get_random_edits<T>(
+ &self,
rng: &mut T,
edit_count: usize,
- ) -> (Vec<(Range<usize>, Arc<str>)>, Operation)
+ ) -> Vec<(Range<usize>, Arc<str>)>
where
T: rand::Rng,
{
@@ -1504,8 +1527,21 @@ impl Buffer {
edits.push((range, new_text.into()));
}
+ edits
+ }
+ #[allow(clippy::type_complexity)]
+ pub fn randomly_edit<T>(
+ &mut self,
+ rng: &mut T,
+ edit_count: usize,
+ ) -> (Vec<(Range<usize>, Arc<str>)>, Operation)
+ where
+ T: rand::Rng,
+ {
+ let mut edits = self.get_random_edits(rng, edit_count);
log::info!("mutating buffer {} with {:?}", self.replica_id, edits);
+
let op = self.edit(edits.iter().cloned());
if let Operation::Edit(edit) = &op {
assert_eq!(edits.len(), edit.new_text.len());
@@ -1305,10 +1305,8 @@ impl Workspace {
RemoveWorktreeFromProject(worktree_id): &RemoveWorktreeFromProject,
cx: &mut ViewContext<Self>,
) {
- let future = self
- .project
+ self.project
.update(cx, |project, cx| project.remove_worktree(*worktree_id, cx));
- cx.foreground().spawn(future).detach();
}
fn project_path_for_path(
@@ -3266,9 +3264,7 @@ mod tests {
);
// Remove a project folder
- project
- .update(cx, |project, cx| project.remove_worktree(worktree_id, cx))
- .await;
+ project.update(cx, |project, cx| project.remove_worktree(worktree_id, cx));
assert_eq!(
cx.current_window_title(window_id).as_deref(),
Some("one.txt — root2")