diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d13487440fe42c56808012345e008109589ea7b2..1deaefec1a446857748e311a34ffdd741385c944 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,7 +53,7 @@ use std::{ }, time::Duration, }; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -542,8 +542,13 @@ impl Server { // This arrangement ensures we will attempt to process earlier messages first, but fall // back to processing messages arrived later in the spirit of making progress. let mut foreground_message_handlers = FuturesUnordered::new(); + let concurrent_handlers = Arc::new(Semaphore::new(256)); loop { - let next_message = incoming_rx.next().fuse(); + let next_message = async { + let permit = concurrent_handlers.clone().acquire_owned().await.unwrap(); + let message = incoming_rx.next().await; + (permit, message) + }.fuse(); futures::pin_mut!(next_message); futures::select_biased! { _ = teardown.changed().fuse() => return Ok(()), @@ -554,7 +559,8 @@ impl Server { break; } _ = foreground_message_handlers.next() => {} - message = next_message => { + next_message = next_message => { + let (permit, message) = next_message; if let Some(message) = message { let type_name = message.payload_type_name(); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); @@ -564,7 +570,10 @@ impl Server { let handle_message = (handler)(message, session.clone()); drop(span_enter); - let handle_message = handle_message.instrument(span); + let handle_message = async move { + handle_message.await; + drop(permit); + }.instrument(span); if is_background { executor.spawn_detached(handle_message); } else { diff --git a/crates/editor/src/items.rs b/crates/editor/src/items.rs index 0d594a66ef354ab49152a2e0e3d318592f6c3233..cda702de00f6411e10bda7ad890d8136688ebf42 100644 --- a/crates/editor/src/items.rs +++ b/crates/editor/src/items.rs @@ -612,9 +612,34 @@ impl Item for Editor { let buffers = self.buffer().clone().read(cx).all_buffers(); cx.as_mut().spawn(|mut cx| async move { format.await?; - project - .update(&mut cx, |project, cx| project.save_buffers(buffers, cx)) - .await?; + + if buffers.len() == 1 { + project + .update(&mut cx, |project, cx| project.save_buffers(buffers, cx)) + .await?; + } else { + // For multi-buffers, only save those ones that contain changes. For clean buffers + // we simulate saving by calling `Buffer::did_save`, so that language servers or + // other downstream listeners of save events get notified. + let (dirty_buffers, clean_buffers) = buffers.into_iter().partition(|buffer| { + buffer.read_with(&cx, |buffer, _| buffer.is_dirty() || buffer.has_conflict()) + }); + + project + .update(&mut cx, |project, cx| { + project.save_buffers(dirty_buffers, cx) + }) + .await?; + for buffer in clean_buffers { + buffer.update(&mut cx, |buffer, cx| { + let version = buffer.saved_version().clone(); + let fingerprint = buffer.saved_version_fingerprint(); + let mtime = buffer.saved_mtime(); + buffer.did_save(version, fingerprint, mtime, cx); + }); + } + } + Ok(()) }) } diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index acccb553d3d14d11e6140fb8e2a81818214e84e1..09a96342271b00f508094b8426ab5ddf81b04131 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1353,7 +1353,13 @@ impl Buffer { } pub fn remove_active_selections(&mut self, cx: &mut ModelContext) { - self.set_active_selections(Arc::from([]), false, Default::default(), cx); + if self + .remote_selections + .get(&self.text.replica_id()) + .map_or(true, |set| !set.selections.is_empty()) + { + self.set_active_selections(Arc::from([]), false, Default::default(), cx); + } } pub fn set_text(&mut self, text: T, cx: &mut ModelContext) -> Option diff --git a/crates/language/src/buffer_tests.rs b/crates/language/src/buffer_tests.rs index 0c375f0f4e1e27e59bbc737443307ae1b9c94c66..e98edbb2d32c1c6bf06a1f250e6ecb8b41a52b02 100644 --- a/crates/language/src/buffer_tests.rs +++ b/crates/language/src/buffer_tests.rs @@ -1804,25 +1804,31 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { } 30..=39 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { - let mut selections = Vec::new(); - for id in 0..rng.gen_range(1..=5) { - let range = buffer.random_byte_range(0, &mut rng); - selections.push(Selection { - id, - start: buffer.anchor_before(range.start), - end: buffer.anchor_before(range.end), - reversed: false, - goal: SelectionGoal::None, - }); + if rng.gen_bool(0.2) { + log::info!("peer {} clearing active selections", replica_id); + active_selections.remove(&replica_id); + buffer.remove_active_selections(cx); + } else { + let mut selections = Vec::new(); + for id in 0..rng.gen_range(1..=5) { + let range = buffer.random_byte_range(0, &mut rng); + selections.push(Selection { + id, + start: buffer.anchor_before(range.start), + end: buffer.anchor_before(range.end), + reversed: false, + goal: SelectionGoal::None, + }); + } + let selections: Arc<[Selection]> = selections.into(); + log::info!( + "peer {} setting active selections: {:?}", + replica_id, + selections + ); + active_selections.insert(replica_id, selections.clone()); + buffer.set_active_selections(selections, false, Default::default(), cx); } - let selections: Arc<[Selection]> = selections.into(); - log::info!( - "peer {} setting active selections: {:?}", - replica_id, - selections - ); - active_selections.insert(replica_id, selections.clone()); - buffer.set_active_selections(selections, false, Default::default(), cx); }); mutation_count -= 1; }