@@ -29,7 +29,10 @@ use std::{
env, future, mem,
path::{Path, PathBuf},
rc::Rc,
- sync::Arc,
+ sync::{
+ atomic::{AtomicBool, Ordering::SeqCst},
+ Arc,
+ },
};
use unindent::Unindent as _;
use workspace::{
@@ -3535,6 +3538,141 @@ async fn test_collaborating_with_diagnostics(
});
}
+#[gpui::test(iterations = 10)]
+async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
+ deterministic: Arc<Deterministic>,
+ cx_a: &mut TestAppContext,
+ cx_b: &mut TestAppContext,
+) {
+ deterministic.forbid_parking();
+ let mut server = TestServer::start(&deterministic).await;
+ let client_a = server.create_client(cx_a, "user_a").await;
+ let client_b = server.create_client(cx_b, "user_b").await;
+ server
+ .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+ .await;
+
+ // Set up a fake language server.
+ let mut language = Language::new(
+ LanguageConfig {
+ name: "Rust".into(),
+ path_suffixes: vec!["rs".to_string()],
+ ..Default::default()
+ },
+ Some(tree_sitter_rust::language()),
+ );
+ let mut fake_language_servers = language
+ .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
+ disk_based_diagnostics_progress_token: Some("the-disk-based-token".into()),
+ disk_based_diagnostics_sources: vec!["the-disk-based-diagnostics-source".into()],
+ ..Default::default()
+ }))
+ .await;
+ client_a.language_registry.add(Arc::new(language));
+
+ let file_names = &["one.rs", "two.rs", "three.rs", "four.rs", "five.rs"];
+ client_a
+ .fs
+ .insert_tree(
+ "/test",
+ json!({
+ "one.rs": "const ONE: usize = 1;",
+ "two.rs": "const TWO: usize = 2;",
+ "three.rs": "const THREE: usize = 3;",
+ "four.rs": "const FOUR: usize = 3;",
+ "five.rs": "const FIVE: usize = 3;",
+ }),
+ )
+ .await;
+
+ let (project_a, worktree_id) = client_a.build_local_project("/test", cx_a).await;
+
+ // Share a project as client A
+ let active_call_a = cx_a.read(ActiveCall::global);
+ let project_id = active_call_a
+ .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+ .await
+ .unwrap();
+
+ // Join the project as client B and open all three files.
+ let project_b = client_b.build_remote_project(project_id, cx_b).await;
+ let guest_buffers = futures::future::try_join_all(file_names.iter().map(|file_name| {
+ project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, file_name), cx))
+ }))
+ .await
+ .unwrap();
+
+ // Simulate a language server reporting errors for a file.
+ let fake_language_server = fake_language_servers.next().await.unwrap();
+ fake_language_server
+ .request::<lsp::request::WorkDoneProgressCreate>(lsp::WorkDoneProgressCreateParams {
+ token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+ })
+ .await
+ .unwrap();
+ fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
+ token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+ value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
+ lsp::WorkDoneProgressBegin {
+ title: "Progress Began".into(),
+ ..Default::default()
+ },
+ )),
+ });
+ for file_name in file_names {
+ fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+ lsp::PublishDiagnosticsParams {
+ uri: lsp::Url::from_file_path(Path::new("/test").join(file_name)).unwrap(),
+ version: None,
+ diagnostics: vec![lsp::Diagnostic {
+ severity: Some(lsp::DiagnosticSeverity::WARNING),
+ source: Some("the-disk-based-diagnostics-source".into()),
+ range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
+ message: "message one".to_string(),
+ ..Default::default()
+ }],
+ },
+ );
+ }
+ fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
+ token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+ value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
+ lsp::WorkDoneProgressEnd { message: None },
+ )),
+ });
+
+ // When the "disk base diagnostics finished" message is received, the buffers'
+ // diagnostics are expected to be present.
+ let disk_based_diagnostics_finished = Arc::new(AtomicBool::new(false));
+ project_b.update(cx_b, {
+ let project_b = project_b.clone();
+ let disk_based_diagnostics_finished = disk_based_diagnostics_finished.clone();
+ move |_, cx| {
+ cx.subscribe(&project_b, move |_, _, event, cx| {
+ if let project::Event::DiskBasedDiagnosticsFinished { .. } = event {
+ disk_based_diagnostics_finished.store(true, SeqCst);
+ for buffer in &guest_buffers {
+ assert_eq!(
+ buffer
+ .read(cx)
+ .snapshot()
+ .diagnostics_in_range::<_, usize>(0..5, false)
+ .count(),
+ 1,
+ "expected a diagnostic for buffer {:?}",
+ buffer.read(cx).file().unwrap().path(),
+ );
+ }
+ }
+ })
+ .detach();
+ }
+ });
+
+ deterministic.run_until_parked();
+ assert!(disk_based_diagnostics_finished.load(SeqCst));
+}
+
#[gpui::test(iterations = 10)]
async fn test_collaborating_with_completion(
deterministic: Arc<Deterministic>,
@@ -92,7 +92,7 @@ pub trait Item {
pub struct Project {
worktrees: Vec<WorktreeHandle>,
active_entry: Option<ProjectEntryId>,
- buffer_changes_tx: mpsc::UnboundedSender<BufferMessage>,
+ buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
languages: Arc<LanguageRegistry>,
language_servers: HashMap<usize, LanguageServerState>,
language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
@@ -131,11 +131,16 @@ pub struct Project {
terminals: Terminals,
}
-enum BufferMessage {
+/// Message ordered with respect to buffer operations
+enum BufferOrderedMessage {
Operation {
buffer_id: u64,
operation: proto::Operation,
},
+ LanguageServerUpdate {
+ language_server_id: usize,
+ message: proto::update_language_server::Variant,
+ },
Resync,
}
@@ -436,11 +441,11 @@ impl Project {
) -> ModelHandle<Self> {
cx.add_model(|cx: &mut ModelContext<Self>| {
let (tx, rx) = mpsc::unbounded();
- cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+ cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
.detach();
Self {
worktrees: Default::default(),
- buffer_changes_tx: tx,
+ buffer_ordered_messages_tx: tx,
collaborators: Default::default(),
opened_buffers: Default::default(),
shared_buffers: Default::default(),
@@ -504,11 +509,11 @@ impl Project {
}
let (tx, rx) = mpsc::unbounded();
- cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+ cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
.detach();
let mut this = Self {
worktrees: Vec::new(),
- buffer_changes_tx: tx,
+ buffer_ordered_messages_tx: tx,
loading_buffers_by_path: Default::default(),
opened_buffer: watch::channel(),
shared_buffers: Default::default(),
@@ -1152,8 +1157,8 @@ impl Project {
)
})
.collect();
- self.buffer_changes_tx
- .unbounded_send(BufferMessage::Resync)
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::Resync)
.unwrap();
cx.notify();
Ok(())
@@ -1731,23 +1736,49 @@ impl Project {
});
}
- async fn send_buffer_messages(
+ async fn send_buffer_ordered_messages(
this: WeakModelHandle<Self>,
- rx: UnboundedReceiver<BufferMessage>,
+ rx: UnboundedReceiver<BufferOrderedMessage>,
mut cx: AsyncAppContext,
) -> Option<()> {
const MAX_BATCH_SIZE: usize = 128;
- let mut needs_resync_with_host = false;
let mut operations_by_buffer_id = HashMap::default();
+ async fn flush_operations(
+ this: &ModelHandle<Project>,
+ operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
+ needs_resync_with_host: &mut bool,
+ is_local: bool,
+ cx: &AsyncAppContext,
+ ) {
+ for (buffer_id, operations) in operations_by_buffer_id.drain() {
+ let request = this.read_with(cx, |this, _| {
+ let project_id = this.remote_id()?;
+ Some(this.client.request(proto::UpdateBuffer {
+ buffer_id,
+ project_id,
+ operations,
+ }))
+ });
+ if let Some(request) = request {
+ if request.await.is_err() && !is_local {
+ *needs_resync_with_host = true;
+ break;
+ }
+ }
+ }
+ }
+
+ let mut needs_resync_with_host = false;
let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
+
while let Some(changes) = changes.next().await {
let this = this.upgrade(&mut cx)?;
let is_local = this.read_with(&cx, |this, _| this.is_local());
for change in changes {
match change {
- BufferMessage::Operation {
+ BufferOrderedMessage::Operation {
buffer_id,
operation,
} => {
@@ -1760,7 +1791,8 @@ impl Project {
.or_insert(Vec::new())
.push(operation);
}
- BufferMessage::Resync => {
+
+ BufferOrderedMessage::Resync => {
operations_by_buffer_id.clear();
if this
.update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
@@ -1770,25 +1802,43 @@ impl Project {
needs_resync_with_host = false;
}
}
- }
- }
- for (buffer_id, operations) in operations_by_buffer_id.drain() {
- let request = this.read_with(&cx, |this, _| {
- let project_id = this.remote_id()?;
- Some(this.client.request(proto::UpdateBuffer {
- buffer_id,
- project_id,
- operations,
- }))
- });
- if let Some(request) = request {
- if request.await.is_err() && !is_local {
- needs_resync_with_host = true;
- break;
+ BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id,
+ message,
+ } => {
+ flush_operations(
+ &this,
+ &mut operations_by_buffer_id,
+ &mut needs_resync_with_host,
+ is_local,
+ &cx,
+ )
+ .await;
+
+ this.read_with(&cx, |this, _| {
+ if let Some(project_id) = this.remote_id() {
+ this.client
+ .send(proto::UpdateLanguageServer {
+ project_id,
+ language_server_id: language_server_id as u64,
+ variant: Some(message),
+ })
+ .log_err();
+ }
+ });
}
}
}
+
+ flush_operations(
+ &this,
+ &mut operations_by_buffer_id,
+ &mut needs_resync_with_host,
+ is_local,
+ &cx,
+ )
+ .await;
}
None
@@ -1802,8 +1852,8 @@ impl Project {
) -> Option<()> {
match event {
BufferEvent::Operation(operation) => {
- self.buffer_changes_tx
- .unbounded_send(BufferMessage::Operation {
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::Operation {
buffer_id: buffer.read(cx).remote_id(),
operation: language::proto::serialize_operation(operation),
})
@@ -1894,14 +1944,19 @@ impl Project {
let task = cx.spawn_weak(|this, mut cx| async move {
cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
if let Some(this) = this.upgrade(&cx) {
- this.update(&mut cx, |this, cx | {
- this.disk_based_diagnostics_finished(language_server_id, cx);
- this.broadcast_language_server_update(
+ this.update(&mut cx, |this, cx| {
+ this.disk_based_diagnostics_finished(
language_server_id,
- proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
- proto::LspDiskBasedDiagnosticsUpdated {},
- ),
+ cx,
);
+ this.buffer_ordered_messages_tx
+ .unbounded_send(
+ BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id,
+ message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
+ },
+ )
+ .ok();
});
}
});
@@ -2540,12 +2595,12 @@ impl Project {
if is_disk_based_diagnostics_progress {
language_server_status.has_pending_diagnostic_updates = true;
self.disk_based_diagnostics_started(server_id, cx);
- self.broadcast_language_server_update(
- server_id,
- proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
- proto::LspDiskBasedDiagnosticsUpdating {},
- ),
- );
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id: server_id,
+ message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
+ })
+ .ok();
} else {
self.on_lsp_work_start(
server_id,
@@ -2557,14 +2612,18 @@ impl Project {
},
cx,
);
- self.broadcast_language_server_update(
- server_id,
- proto::update_language_server::Variant::WorkStart(proto::LspWorkStart {
- token,
- message: report.message,
- percentage: report.percentage.map(|p| p as u32),
- }),
- );
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id: server_id,
+ message: proto::update_language_server::Variant::WorkStart(
+ proto::LspWorkStart {
+ token,
+ message: report.message,
+ percentage: report.percentage.map(|p| p as u32),
+ },
+ ),
+ })
+ .ok();
}
}
lsp::WorkDoneProgress::Report(report) => {
@@ -2579,16 +2638,18 @@ impl Project {
},
cx,
);
- self.broadcast_language_server_update(
- server_id,
- proto::update_language_server::Variant::WorkProgress(
- proto::LspWorkProgress {
- token,
- message: report.message,
- percentage: report.percentage.map(|p| p as u32),
- },
- ),
- );
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id: server_id,
+ message: proto::update_language_server::Variant::WorkProgress(
+ proto::LspWorkProgress {
+ token,
+ message: report.message,
+ percentage: report.percentage.map(|p| p as u32),
+ },
+ ),
+ })
+ .ok();
}
}
lsp::WorkDoneProgress::End(_) => {
@@ -2597,20 +2658,25 @@ impl Project {
if is_disk_based_diagnostics_progress {
language_server_status.has_pending_diagnostic_updates = false;
self.disk_based_diagnostics_finished(server_id, cx);
- self.broadcast_language_server_update(
- server_id,
- proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
- proto::LspDiskBasedDiagnosticsUpdated {},
- ),
- );
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id: server_id,
+ message:
+ proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+ Default::default(),
+ ),
+ })
+ .ok();
} else {
self.on_lsp_work_end(server_id, token.clone(), cx);
- self.broadcast_language_server_update(
- server_id,
- proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
- token,
- }),
- );
+ self.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+ language_server_id: server_id,
+ message: proto::update_language_server::Variant::WorkEnd(
+ proto::LspWorkEnd { token },
+ ),
+ })
+ .ok();
}
}
}
@@ -2719,22 +2785,6 @@ impl Project {
})
}
- fn broadcast_language_server_update(
- &self,
- language_server_id: usize,
- event: proto::update_language_server::Variant,
- ) {
- if let Some(project_id) = self.remote_id() {
- self.client
- .send(proto::UpdateLanguageServer {
- project_id,
- language_server_id: language_server_id as u64,
- variant: Some(event),
- })
- .log_err();
- }
- }
-
pub fn language_server_statuses(
&self,
) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
@@ -4743,8 +4793,8 @@ impl Project {
if is_host {
this.opened_buffers
.retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
- this.buffer_changes_tx
- .unbounded_send(BufferMessage::Resync)
+ this.buffer_ordered_messages_tx
+ .unbounded_send(BufferOrderedMessage::Resync)
.unwrap();
}