Merge pull request #2252 from zed-industries/limit-messages-processed-in-parallel

Antonio Scandurra created

Prevent collab server from being overwhelmed with messages

Change summary

crates/collab/src/rpc.rs            | 17 +++++++++--
crates/editor/src/items.rs          | 31 ++++++++++++++++++++--
crates/language/src/buffer.rs       |  8 +++++
crates/language/src/buffer_tests.rs | 42 +++++++++++++++++-------------
4 files changed, 72 insertions(+), 26 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -53,7 +53,7 @@ use std::{
     },
     time::Duration,
 };
-use tokio::sync::watch;
+use tokio::sync::{watch, Semaphore};
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
@@ -542,8 +542,13 @@ impl Server {
             // This arrangement ensures we will attempt to process earlier messages first, but fall
             // back to processing messages arrived later in the spirit of making progress.
             let mut foreground_message_handlers = FuturesUnordered::new();
+            let concurrent_handlers = Arc::new(Semaphore::new(256));
             loop {
-                let next_message = incoming_rx.next().fuse();
+                let next_message = async {
+                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
+                    let message = incoming_rx.next().await;
+                    (permit, message)
+                }.fuse();
                 futures::pin_mut!(next_message);
                 futures::select_biased! {
                     _ = teardown.changed().fuse() => return Ok(()),
@@ -554,7 +559,8 @@ impl Server {
                         break;
                     }
                     _ = foreground_message_handlers.next() => {}
-                    message = next_message => {
+                    next_message = next_message => {
+                        let (permit, message) = next_message;
                         if let Some(message) = message {
                             let type_name = message.payload_type_name();
                             let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
@@ -564,7 +570,10 @@ impl Server {
                                 let handle_message = (handler)(message, session.clone());
                                 drop(span_enter);
 
-                                let handle_message = handle_message.instrument(span);
+                                let handle_message = async move {
+                                    handle_message.await;
+                                    drop(permit);
+                                }.instrument(span);
                                 if is_background {
                                     executor.spawn_detached(handle_message);
                                 } else {

crates/editor/src/items.rs 🔗

@@ -612,9 +612,34 @@ impl Item for Editor {
         let buffers = self.buffer().clone().read(cx).all_buffers();
         cx.as_mut().spawn(|mut cx| async move {
             format.await?;
-            project
-                .update(&mut cx, |project, cx| project.save_buffers(buffers, cx))
-                .await?;
+
+            if buffers.len() == 1 {
+                project
+                    .update(&mut cx, |project, cx| project.save_buffers(buffers, cx))
+                    .await?;
+            } else {
+                // For multi-buffers, only save those ones that contain changes. For clean buffers
+                // we simulate saving by calling `Buffer::did_save`, so that language servers or
+                // other downstream listeners of save events get notified.
+                let (dirty_buffers, clean_buffers) = buffers.into_iter().partition(|buffer| {
+                    buffer.read_with(&cx, |buffer, _| buffer.is_dirty() || buffer.has_conflict())
+                });
+
+                project
+                    .update(&mut cx, |project, cx| {
+                        project.save_buffers(dirty_buffers, cx)
+                    })
+                    .await?;
+                for buffer in clean_buffers {
+                    buffer.update(&mut cx, |buffer, cx| {
+                        let version = buffer.saved_version().clone();
+                        let fingerprint = buffer.saved_version_fingerprint();
+                        let mtime = buffer.saved_mtime();
+                        buffer.did_save(version, fingerprint, mtime, cx);
+                    });
+                }
+            }
+
             Ok(())
         })
     }

crates/language/src/buffer.rs 🔗

@@ -1353,7 +1353,13 @@ impl Buffer {
     }
 
     pub fn remove_active_selections(&mut self, cx: &mut ModelContext<Self>) {
-        self.set_active_selections(Arc::from([]), false, Default::default(), cx);
+        if self
+            .remote_selections
+            .get(&self.text.replica_id())
+            .map_or(true, |set| !set.selections.is_empty())
+        {
+            self.set_active_selections(Arc::from([]), false, Default::default(), cx);
+        }
     }
 
     pub fn set_text<T>(&mut self, text: T, cx: &mut ModelContext<Self>) -> Option<clock::Local>

crates/language/src/buffer_tests.rs 🔗

@@ -1804,25 +1804,31 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             }
             30..=39 if mutation_count != 0 => {
                 buffer.update(cx, |buffer, cx| {
-                    let mut selections = Vec::new();
-                    for id in 0..rng.gen_range(1..=5) {
-                        let range = buffer.random_byte_range(0, &mut rng);
-                        selections.push(Selection {
-                            id,
-                            start: buffer.anchor_before(range.start),
-                            end: buffer.anchor_before(range.end),
-                            reversed: false,
-                            goal: SelectionGoal::None,
-                        });
+                    if rng.gen_bool(0.2) {
+                        log::info!("peer {} clearing active selections", replica_id);
+                        active_selections.remove(&replica_id);
+                        buffer.remove_active_selections(cx);
+                    } else {
+                        let mut selections = Vec::new();
+                        for id in 0..rng.gen_range(1..=5) {
+                            let range = buffer.random_byte_range(0, &mut rng);
+                            selections.push(Selection {
+                                id,
+                                start: buffer.anchor_before(range.start),
+                                end: buffer.anchor_before(range.end),
+                                reversed: false,
+                                goal: SelectionGoal::None,
+                            });
+                        }
+                        let selections: Arc<[Selection<Anchor>]> = selections.into();
+                        log::info!(
+                            "peer {} setting active selections: {:?}",
+                            replica_id,
+                            selections
+                        );
+                        active_selections.insert(replica_id, selections.clone());
+                        buffer.set_active_selections(selections, false, Default::default(), cx);
                     }
-                    let selections: Arc<[Selection<Anchor>]> = selections.into();
-                    log::info!(
-                        "peer {} setting active selections: {:?}",
-                        replica_id,
-                        selections
-                    );
-                    active_selections.insert(replica_id, selections.clone());
-                    buffer.set_active_selections(selections, false, Default::default(), cx);
                 });
                 mutation_count -= 1;
             }