@@ -53,7 +53,7 @@ use std::{
},
time::Duration,
};
-use tokio::sync::watch;
+use tokio::sync::{watch, Semaphore};
use tower::ServiceBuilder;
use tracing::{info_span, instrument, Instrument};
@@ -542,8 +542,13 @@ impl Server {
// This arrangement ensures we will attempt to process earlier messages first, but fall
// back to processing messages arrived later in the spirit of making progress.
let mut foreground_message_handlers = FuturesUnordered::new();
+ let concurrent_handlers = Arc::new(Semaphore::new(256));
loop {
- let next_message = incoming_rx.next().fuse();
+ let next_message = async {
+ let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
+ let message = incoming_rx.next().await;
+ (permit, message)
+ }.fuse();
futures::pin_mut!(next_message);
futures::select_biased! {
_ = teardown.changed().fuse() => return Ok(()),
@@ -554,7 +559,8 @@ impl Server {
break;
}
_ = foreground_message_handlers.next() => {}
- message = next_message => {
+ next_message = next_message => {
+ let (permit, message) = next_message;
if let Some(message) = message {
let type_name = message.payload_type_name();
let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
@@ -564,7 +570,10 @@ impl Server {
let handle_message = (handler)(message, session.clone());
drop(span_enter);
- let handle_message = handle_message.instrument(span);
+ let handle_message = async move {
+ handle_message.await;
+ drop(permit);
+ }.instrument(span);
if is_background {
executor.spawn_detached(handle_message);
} else {