@@ -464,6 +464,7 @@ impl Server {
TypeId::of::<M>(),
Box::new(move |envelope, session| {
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
+ let received_at = envelope.received_at;
let span = info_span!(
"handle message",
payload_type = envelope.payload_type_name()
@@ -478,12 +479,14 @@ impl Server {
let future = (handler)(*envelope, session);
async move {
let result = future.await;
- let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
+ let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0;
+ let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
+ let queue_duration_ms = processing_duration_ms - total_duration_ms;
match result {
Err(error) => {
- tracing::error!(%error, ?duration_ms, "error handling message")
+ tracing::error!(%error, ?total_duration_ms, ?processing_duration_ms, ?queue_duration_ms, "error handling message")
}
- Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
+ Ok(()) => tracing::info!(?total_duration_ms, ?processing_duration_ms, ?queue_duration_ms, "finished handling message"),
}
}
.instrument(span)
@@ -1,7 +1,7 @@
#[macro_export]
macro_rules! messages {
($(($name:ident, $priority:ident)),* $(,)?) => {
- pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
+ pub fn build_typed_envelope(sender_id: ConnectionId, received_at: Instant, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
match envelope.payload {
$(Some(envelope::Payload::$name(payload)) => {
Some(Box::new(TypedEnvelope {
@@ -12,6 +12,7 @@ macro_rules! messages {
}),
message_id: envelope.id,
payload,
+ received_at,
}))
}, )*
_ => None
@@ -13,7 +13,7 @@ use futures::{
};
use parking_lot::{Mutex, RwLock};
use serde::{ser::SerializeStruct, Serialize};
-use std::{fmt, sync::atomic::Ordering::SeqCst};
+use std::{fmt, sync::atomic::Ordering::SeqCst, time::Instant};
use std::{
future::Future,
marker::PhantomData,
@@ -79,6 +79,7 @@ pub struct TypedEnvelope<T> {
pub original_sender_id: Option<PeerId>,
pub message_id: u32,
pub payload: T,
+ pub received_at: Instant,
}
impl<T> TypedEnvelope<T> {
@@ -111,8 +112,16 @@ pub struct ConnectionState {
next_message_id: Arc<AtomicU32>,
#[allow(clippy::type_complexity)]
#[serde(skip)]
- response_channels:
- Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, oneshot::Sender<()>)>>>>>,
+ response_channels: Arc<
+ Mutex<
+ Option<
+ HashMap<
+ u32,
+ oneshot::Sender<(proto::Envelope, std::time::Instant, oneshot::Sender<()>)>,
+ >,
+ >,
+ >,
+ >,
}
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
@@ -154,7 +163,7 @@ impl Peer {
#[cfg(any(test, feature = "test-support"))]
const INCOMING_BUFFER_SIZE: usize = 1;
#[cfg(not(any(test, feature = "test-support")))]
- const INCOMING_BUFFER_SIZE: usize = 64;
+ const INCOMING_BUFFER_SIZE: usize = 256;
let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE);
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
@@ -238,10 +247,10 @@ impl Peer {
tracing::trace!(%connection_id, "incoming rpc message: received");
tracing::trace!(%connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
- if let proto::Message::Envelope(incoming) = incoming {
+ if let (proto::Message::Envelope(incoming), received_at) = incoming {
tracing::trace!(%connection_id, "incoming rpc message: processing");
futures::select_biased! {
- result = incoming_tx.send(incoming).fuse() => match result {
+ result = incoming_tx.send((incoming, received_at)).fuse() => match result {
Ok(_) => {
tracing::trace!(%connection_id, "incoming rpc message: processed");
}
@@ -272,7 +281,7 @@ impl Peer {
.write()
.insert(connection_id, connection_state);
- let incoming_rx = incoming_rx.filter_map(move |incoming| {
+ let incoming_rx = incoming_rx.filter_map(move |(incoming, received_at)| {
let response_channels = response_channels.clone();
async move {
let message_id = incoming.id;
@@ -291,7 +300,7 @@ impl Peer {
let channel = response_channels.lock().as_mut()?.remove(&responding_to);
if let Some(tx) = channel {
let requester_resumed = oneshot::channel();
- if let Err(error) = tx.send((incoming, requester_resumed.0)) {
+ if let Err(error) = tx.send((incoming, received_at, requester_resumed.0)) {
tracing::trace!(
%connection_id,
message_id,
@@ -315,8 +324,9 @@ impl Peer {
"incoming response: requester resumed"
);
} else {
- let message_type = proto::build_typed_envelope(connection_id, incoming)
- .map(|p| p.payload_type_name());
+ let message_type =
+ proto::build_typed_envelope(connection_id, received_at, incoming)
+ .map(|p| p.payload_type_name());
tracing::warn!(
%connection_id,
message_id,
@@ -329,14 +339,16 @@ impl Peer {
None
} else {
tracing::trace!(%connection_id, message_id, "incoming message: received");
- proto::build_typed_envelope(connection_id, incoming).or_else(|| {
- tracing::error!(
- %connection_id,
- message_id,
- "unable to construct a typed envelope"
- );
- None
- })
+ proto::build_typed_envelope(connection_id, received_at, incoming).or_else(
+ || {
+ tracing::error!(
+ %connection_id,
+ message_id,
+ "unable to construct a typed envelope"
+ );
+ None
+ },
+ )
}
}
});
@@ -425,7 +437,8 @@ impl Peer {
});
async move {
send?;
- let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
+ let (response, received_at, _barrier) =
+ rx.await.map_err(|_| anyhow!("connection was closed"))?;
if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
Err(RpcError::from_proto(&error, T::NAME))
@@ -436,6 +449,7 @@ impl Peer {
original_sender_id: response.original_sender_id,
payload: T::Response::from_envelope(response)
.ok_or_else(|| anyhow!("received response of the wrong type"))?,
+ received_at,
})
}
}
@@ -8,6 +8,7 @@ use futures::{SinkExt as _, StreamExt as _};
use prost::Message as _;
use serde::Serialize;
use std::any::{Any, TypeId};
+use std::time::Instant;
use std::{
cmp,
fmt::Debug,
@@ -515,8 +516,9 @@ impl<S> MessageStream<S>
where
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
{
- pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
+ pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
while let Some(bytes) = self.stream.next().await {
+ let received_at = Instant::now();
match bytes? {
WebSocketMessage::Binary(bytes) => {
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
@@ -525,10 +527,10 @@ where
self.encoding_buffer.clear();
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
- return Ok(Message::Envelope(envelope));
+ return Ok((Message::Envelope(envelope), received_at));
}
- WebSocketMessage::Ping(_) => return Ok(Message::Ping),
- WebSocketMessage::Pong(_) => return Ok(Message::Pong),
+ WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
+ WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
WebSocketMessage::Close(_) => break,
_ => {}
}