From b4561b848d05af4908b5f493767f7cad3005610d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 8 Mar 2023 10:56:40 +0100 Subject: [PATCH 1/5] Limit the number of parallel messages handled for any given connection --- crates/collab/src/rpc.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d13487440fe42c56808012345e008109589ea7b2..1deaefec1a446857748e311a34ffdd741385c944 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,7 +53,7 @@ use std::{ }, time::Duration, }; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -542,8 +542,13 @@ impl Server { // This arrangement ensures we will attempt to process earlier messages first, but fall // back to processing messages arrived later in the spirit of making progress. let mut foreground_message_handlers = FuturesUnordered::new(); + let concurrent_handlers = Arc::new(Semaphore::new(256)); loop { - let next_message = incoming_rx.next().fuse(); + let next_message = async { + let permit = concurrent_handlers.clone().acquire_owned().await.unwrap(); + let message = incoming_rx.next().await; + (permit, message) + }.fuse(); futures::pin_mut!(next_message); futures::select_biased! { _ = teardown.changed().fuse() => return Ok(()), @@ -554,7 +559,8 @@ impl Server { break; } _ = foreground_message_handlers.next() => {} - message = next_message => { + next_message = next_message => { + let (permit, message) = next_message; if let Some(message) = message { let type_name = message.payload_type_name(); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); @@ -564,7 +570,10 @@ impl Server { let handle_message = (handler)(message, session.clone()); drop(span_enter); - let handle_message = handle_message.instrument(span); + let handle_message = async move { + handle_message.await; + drop(permit); + }.instrument(span); if is_background { executor.spawn_detached(handle_message); } else { From a435dc1339bd10e2b43f5bbd89f74950332abcf5 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 8 Mar 2023 16:11:51 +0100 Subject: [PATCH 2/5] Clear selections on buffer only if they hadn't been cleared already --- crates/language/src/buffer.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index acccb553d3d14d11e6140fb8e2a81818214e84e1..09a96342271b00f508094b8426ab5ddf81b04131 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1353,7 +1353,13 @@ impl Buffer { } pub fn remove_active_selections(&mut self, cx: &mut ModelContext) { - self.set_active_selections(Arc::from([]), false, Default::default(), cx); + if self + .remote_selections + .get(&self.text.replica_id()) + .map_or(true, |set| !set.selections.is_empty()) + { + self.set_active_selections(Arc::from([]), false, Default::default(), cx); + } } pub fn set_text(&mut self, text: T, cx: &mut ModelContext) -> Option From b687aec9d9f79ae7428fc2c7cc0a48d53bb9652e Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 8 Mar 2023 17:02:12 +0100 Subject: [PATCH 3/5] Avoid saving buffer if it's neither dirty nor in conflict However, keep emitting `Saved` events so that the language server is notified and can perform tasks related to saving (e.g., running `cargo check` in the case of rust-analyzer). --- crates/project/src/project.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 2ebea8d07dad811a378a065a7f666caaf594c6d3..f93de8e1d847d263dad2526dbcaaa77b0571b118 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1434,7 +1434,19 @@ impl Project { let worktree = file.worktree.clone(); let path = file.path.clone(); worktree.update(cx, |worktree, cx| match worktree { - Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx), + Worktree::Local(worktree) => { + if buffer.read(cx).is_dirty() || buffer.read(cx).has_conflict() { + worktree.save_buffer(buffer, path, false, cx) + } else { + buffer.update(cx, |buffer, cx| { + let version = buffer.saved_version().clone(); + let fingerprint = buffer.saved_version_fingerprint(); + let mtime = buffer.saved_mtime(); + buffer.did_save(version.clone(), fingerprint, mtime, cx); + Task::ready(Ok((version, fingerprint, mtime))) + }) + } + } Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx), }) } From 4ce51c813841af211fb9b36284f4df4db6078a53 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 9 Mar 2023 07:26:22 +0100 Subject: [PATCH 4/5] Limit dirty buffer save optimization to multi-buffers --- crates/editor/src/items.rs | 31 ++++++++++++++++++++++++++++--- crates/project/src/project.rs | 14 +------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/crates/editor/src/items.rs b/crates/editor/src/items.rs index 0d594a66ef354ab49152a2e0e3d318592f6c3233..cda702de00f6411e10bda7ad890d8136688ebf42 100644 --- a/crates/editor/src/items.rs +++ b/crates/editor/src/items.rs @@ -612,9 +612,34 @@ impl Item for Editor { let buffers = self.buffer().clone().read(cx).all_buffers(); cx.as_mut().spawn(|mut cx| async move { format.await?; - project - .update(&mut cx, |project, cx| project.save_buffers(buffers, cx)) - .await?; + + if buffers.len() == 1 { + project + .update(&mut cx, |project, cx| project.save_buffers(buffers, cx)) + .await?; + } else { + // For multi-buffers, only save those ones that contain changes. For clean buffers + // we simulate saving by calling `Buffer::did_save`, so that language servers or + // other downstream listeners of save events get notified. + let (dirty_buffers, clean_buffers) = buffers.into_iter().partition(|buffer| { + buffer.read_with(&cx, |buffer, _| buffer.is_dirty() || buffer.has_conflict()) + }); + + project + .update(&mut cx, |project, cx| { + project.save_buffers(dirty_buffers, cx) + }) + .await?; + for buffer in clean_buffers { + buffer.update(&mut cx, |buffer, cx| { + let version = buffer.saved_version().clone(); + let fingerprint = buffer.saved_version_fingerprint(); + let mtime = buffer.saved_mtime(); + buffer.did_save(version, fingerprint, mtime, cx); + }); + } + } + Ok(()) }) } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index f93de8e1d847d263dad2526dbcaaa77b0571b118..2ebea8d07dad811a378a065a7f666caaf594c6d3 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1434,19 +1434,7 @@ impl Project { let worktree = file.worktree.clone(); let path = file.path.clone(); worktree.update(cx, |worktree, cx| match worktree { - Worktree::Local(worktree) => { - if buffer.read(cx).is_dirty() || buffer.read(cx).has_conflict() { - worktree.save_buffer(buffer, path, false, cx) - } else { - buffer.update(cx, |buffer, cx| { - let version = buffer.saved_version().clone(); - let fingerprint = buffer.saved_version_fingerprint(); - let mtime = buffer.saved_mtime(); - buffer.did_save(version.clone(), fingerprint, mtime, cx); - Task::ready(Ok((version, fingerprint, mtime))) - }) - } - } + Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx), Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx), }) } From a00ce3f286db6ff34938abd4ceab6e9f08f27e28 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 9 Mar 2023 07:42:37 +0100 Subject: [PATCH 5/5] Add randomized test to remove active selections from buffer --- crates/language/src/buffer_tests.rs | 42 ++++++++++++++++------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/crates/language/src/buffer_tests.rs b/crates/language/src/buffer_tests.rs index 0c375f0f4e1e27e59bbc737443307ae1b9c94c66..e98edbb2d32c1c6bf06a1f250e6ecb8b41a52b02 100644 --- a/crates/language/src/buffer_tests.rs +++ b/crates/language/src/buffer_tests.rs @@ -1804,25 +1804,31 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { } 30..=39 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { - let mut selections = Vec::new(); - for id in 0..rng.gen_range(1..=5) { - let range = buffer.random_byte_range(0, &mut rng); - selections.push(Selection { - id, - start: buffer.anchor_before(range.start), - end: buffer.anchor_before(range.end), - reversed: false, - goal: SelectionGoal::None, - }); + if rng.gen_bool(0.2) { + log::info!("peer {} clearing active selections", replica_id); + active_selections.remove(&replica_id); + buffer.remove_active_selections(cx); + } else { + let mut selections = Vec::new(); + for id in 0..rng.gen_range(1..=5) { + let range = buffer.random_byte_range(0, &mut rng); + selections.push(Selection { + id, + start: buffer.anchor_before(range.start), + end: buffer.anchor_before(range.end), + reversed: false, + goal: SelectionGoal::None, + }); + } + let selections: Arc<[Selection]> = selections.into(); + log::info!( + "peer {} setting active selections: {:?}", + replica_id, + selections + ); + active_selections.insert(replica_id, selections.clone()); + buffer.set_active_selections(selections, false, Default::default(), cx); } - let selections: Arc<[Selection]> = selections.into(); - log::info!( - "peer {} setting active selections: {:?}", - replica_id, - selections - ); - active_selections.insert(replica_id, selections.clone()); - buffer.set_active_selections(selections, false, Default::default(), cx); }); mutation_count -= 1; }