Move timing fields into span (#35833)

Mikayla Maki created

Release Notes:

- N/A

Change summary

crates/collab/src/rpc.rs | 256 ++++++++++++++++++++++++-----------------
crates/rpc/src/peer.rs   |  15 --
2 files changed, 149 insertions(+), 122 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -41,9 +41,11 @@ use chrono::Utc;
 use collections::{HashMap, HashSet};
 pub use connection_pool::{ConnectionPool, ZedVersion};
 use core::fmt::{self, Debug, Formatter};
+use futures::TryFutureExt as _;
 use reqwest_client::ReqwestClient;
 use rpc::proto::{MultiLspQuery, split_repository_update};
 use supermaven_api::{CreateExternalUserRequest, SupermavenAdminApi};
+use tracing::Span;
 
 use futures::{
     FutureExt, SinkExt, StreamExt, TryStreamExt, channel::oneshot, future::BoxFuture,
@@ -94,8 +96,13 @@ const MAX_CONCURRENT_CONNECTIONS: usize = 512;
 
 static CONCURRENT_CONNECTIONS: AtomicUsize = AtomicUsize::new(0);
 
+const TOTAL_DURATION_MS: &str = "total_duration_ms";
+const PROCESSING_DURATION_MS: &str = "processing_duration_ms";
+const QUEUE_DURATION_MS: &str = "queue_duration_ms";
+const HOST_WAITING_MS: &str = "host_waiting_ms";
+
 type MessageHandler =
-    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
+    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session, Span) -> BoxFuture<'static, ()>>;
 
 pub struct ConnectionGuard;
 
@@ -163,6 +170,42 @@ impl Principal {
     }
 }
 
+#[derive(Clone)]
+struct MessageContext {
+    session: Session,
+    span: tracing::Span,
+}
+
+impl Deref for MessageContext {
+    type Target = Session;
+
+    fn deref(&self) -> &Self::Target {
+        &self.session
+    }
+}
+
+impl MessageContext {
+    pub fn forward_request<T: RequestMessage>(
+        &self,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = anyhow::Result<T::Response>> {
+        let request_start_time = Instant::now();
+        let span = self.span.clone();
+        tracing::info!("start forwarding request");
+        self.peer
+            .forward_request(self.connection_id, receiver_id, request)
+            .inspect(move |_| {
+                span.record(
+                    HOST_WAITING_MS,
+                    request_start_time.elapsed().as_micros() as f64 / 1000.0,
+                );
+            })
+            .inspect_err(|_| tracing::error!("error forwarding request"))
+            .inspect_ok(|_| tracing::info!("finished forwarding request"))
+    }
+}
+
 #[derive(Clone)]
 struct Session {
     principal: Principal,
@@ -646,40 +689,37 @@ impl Server {
 
     fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
-        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
+        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, MessageContext) -> Fut,
         Fut: 'static + Send + Future<Output = Result<()>>,
         M: EnvelopedMessage,
     {
         let prev_handler = self.handlers.insert(
             TypeId::of::<M>(),
-            Box::new(move |envelope, session| {
+            Box::new(move |envelope, session, span| {
                 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
                 let received_at = envelope.received_at;
                 tracing::info!("message received");
                 let start_time = Instant::now();
-                let future = (handler)(*envelope, session);
+                let future = (handler)(
+                    *envelope,
+                    MessageContext {
+                        session,
+                        span: span.clone(),
+                    },
+                );
                 async move {
                     let result = future.await;
                     let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0;
                     let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
                     let queue_duration_ms = total_duration_ms - processing_duration_ms;
-
+                    span.record(TOTAL_DURATION_MS, total_duration_ms);
+                    span.record(PROCESSING_DURATION_MS, processing_duration_ms);
+                    span.record(QUEUE_DURATION_MS, queue_duration_ms);
                     match result {
                         Err(error) => {
-                            tracing::error!(
-                                ?error,
-                                total_duration_ms,
-                                processing_duration_ms,
-                                queue_duration_ms,
-                                "error handling message"
-                            )
+                            tracing::error!(?error, "error handling message")
                         }
-                        Ok(()) => tracing::info!(
-                            total_duration_ms,
-                            processing_duration_ms,
-                            queue_duration_ms,
-                            "finished handling message"
-                        ),
+                        Ok(()) => tracing::info!("finished handling message"),
                     }
                 }
                 .boxed()
@@ -693,7 +733,7 @@ impl Server {
 
     fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
-        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
+        F: 'static + Send + Sync + Fn(M, MessageContext) -> Fut,
         Fut: 'static + Send + Future<Output = Result<()>>,
         M: EnvelopedMessage,
     {
@@ -703,7 +743,7 @@ impl Server {
 
     fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
-        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
+        F: 'static + Send + Sync + Fn(M, Response<M>, MessageContext) -> Fut,
         Fut: Send + Future<Output = Result<()>>,
         M: RequestMessage,
     {
@@ -889,12 +929,16 @@ impl Server {
                                 login=field::Empty,
                                 impersonator=field::Empty,
                                 multi_lsp_query_request=field::Empty,
+                                { TOTAL_DURATION_MS }=field::Empty,
+                                { PROCESSING_DURATION_MS }=field::Empty,
+                                { QUEUE_DURATION_MS }=field::Empty,
+                                { HOST_WAITING_MS }=field::Empty
                             );
                             principal.update_span(&span);
                             let span_enter = span.enter();
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
                                 let is_background = message.is_background();
-                                let handle_message = (handler)(message, session.clone());
+                                let handle_message = (handler)(message, session.clone(), span.clone());
                                 drop(span_enter);
 
                                 let handle_message = async move {
@@ -1386,7 +1430,11 @@ async fn connection_lost(
 }
 
 /// Acknowledges a ping from a client, used to keep the connection alive.
-async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
+async fn ping(
+    _: proto::Ping,
+    response: Response<proto::Ping>,
+    _session: MessageContext,
+) -> Result<()> {
     response.send(proto::Ack {})?;
     Ok(())
 }
@@ -1395,7 +1443,7 @@ async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session
 async fn create_room(
     _request: proto::CreateRoom,
     response: Response<proto::CreateRoom>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let livekit_room = nanoid::nanoid!(30);
 
@@ -1435,7 +1483,7 @@ async fn create_room(
 async fn join_room(
     request: proto::JoinRoom,
     response: Response<proto::JoinRoom>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let room_id = RoomId::from_proto(request.id);
 
@@ -1502,7 +1550,7 @@ async fn join_room(
 async fn rejoin_room(
     request: proto::RejoinRoom,
     response: Response<proto::RejoinRoom>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let room;
     let channel;
@@ -1679,7 +1727,7 @@ fn notify_rejoined_projects(
 async fn leave_room(
     _: proto::LeaveRoom,
     response: Response<proto::LeaveRoom>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     leave_room_for_session(&session, session.connection_id).await?;
     response.send(proto::Ack {})?;
@@ -1690,7 +1738,7 @@ async fn leave_room(
 async fn set_room_participant_role(
     request: proto::SetRoomParticipantRole,
     response: Response<proto::SetRoomParticipantRole>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let user_id = UserId::from_proto(request.user_id);
     let role = ChannelRole::from(request.role());
@@ -1738,7 +1786,7 @@ async fn set_room_participant_role(
 async fn call(
     request: proto::Call,
     response: Response<proto::Call>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let room_id = RoomId::from_proto(request.room_id);
     let calling_user_id = session.user_id();
@@ -1807,7 +1855,7 @@ async fn call(
 async fn cancel_call(
     request: proto::CancelCall,
     response: Response<proto::CancelCall>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let called_user_id = UserId::from_proto(request.called_user_id);
     let room_id = RoomId::from_proto(request.room_id);
@@ -1842,7 +1890,7 @@ async fn cancel_call(
 }
 
 /// Decline an incoming call.
-async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
+async fn decline_call(message: proto::DeclineCall, session: MessageContext) -> Result<()> {
     let room_id = RoomId::from_proto(message.room_id);
     {
         let room = session
@@ -1877,7 +1925,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
 async fn update_participant_location(
     request: proto::UpdateParticipantLocation,
     response: Response<proto::UpdateParticipantLocation>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let room_id = RoomId::from_proto(request.room_id);
     let location = request.location.context("invalid location")?;
@@ -1896,7 +1944,7 @@ async fn update_participant_location(
 async fn share_project(
     request: proto::ShareProject,
     response: Response<proto::ShareProject>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let (project_id, room) = &*session
         .db()
@@ -1917,7 +1965,7 @@ async fn share_project(
 }
 
 /// Unshare a project from the room.
-async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
+async fn unshare_project(message: proto::UnshareProject, session: MessageContext) -> Result<()> {
     let project_id = ProjectId::from_proto(message.project_id);
     unshare_project_internal(project_id, session.connection_id, &session).await
 }
@@ -1964,7 +2012,7 @@ async fn unshare_project_internal(
 async fn join_project(
     request: proto::JoinProject,
     response: Response<proto::JoinProject>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
 
@@ -2111,7 +2159,7 @@ async fn join_project(
 }
 
 /// Leave someone elses shared project.
-async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
+async fn leave_project(request: proto::LeaveProject, session: MessageContext) -> Result<()> {
     let sender_id = session.connection_id;
     let project_id = ProjectId::from_proto(request.project_id);
     let db = session.db().await;
@@ -2134,7 +2182,7 @@ async fn leave_project(request: proto::LeaveProject, session: Session) -> Result
 async fn update_project(
     request: proto::UpdateProject,
     response: Response<proto::UpdateProject>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
     let (room, guest_connection_ids) = &*session
@@ -2163,7 +2211,7 @@ async fn update_project(
 async fn update_worktree(
     request: proto::UpdateWorktree,
     response: Response<proto::UpdateWorktree>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2187,7 +2235,7 @@ async fn update_worktree(
 async fn update_repository(
     request: proto::UpdateRepository,
     response: Response<proto::UpdateRepository>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2211,7 +2259,7 @@ async fn update_repository(
 async fn remove_repository(
     request: proto::RemoveRepository,
     response: Response<proto::RemoveRepository>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2235,7 +2283,7 @@ async fn remove_repository(
 /// Updates other participants with changes to the diagnostics
 async fn update_diagnostic_summary(
     message: proto::UpdateDiagnosticSummary,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2259,7 +2307,7 @@ async fn update_diagnostic_summary(
 /// Updates other participants with changes to the worktree settings
 async fn update_worktree_settings(
     message: proto::UpdateWorktreeSettings,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2283,7 +2331,7 @@ async fn update_worktree_settings(
 /// Notify other participants that a language server has started.
 async fn start_language_server(
     request: proto::StartLanguageServer,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let guest_connection_ids = session
         .db()
@@ -2306,7 +2354,7 @@ async fn start_language_server(
 /// Notify other participants that a language server has changed.
 async fn update_language_server(
     request: proto::UpdateLanguageServer,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
     let db = session.db().await;
@@ -2339,7 +2387,7 @@ async fn update_language_server(
 async fn forward_read_only_project_request<T>(
     request: T,
     response: Response<T>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()>
 where
     T: EntityMessage + RequestMessage,
@@ -2350,10 +2398,7 @@ where
         .await
         .host_for_read_only_project_request(project_id, session.connection_id)
         .await?;
-    let payload = session
-        .peer
-        .forward_request(session.connection_id, host_connection_id, request)
-        .await?;
+    let payload = session.forward_request(host_connection_id, request).await?;
     response.send(payload)?;
     Ok(())
 }
@@ -2363,7 +2408,7 @@ where
 async fn forward_mutating_project_request<T>(
     request: T,
     response: Response<T>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()>
 where
     T: EntityMessage + RequestMessage,
@@ -2375,10 +2420,7 @@ where
         .await
         .host_for_mutating_project_request(project_id, session.connection_id)
         .await?;
-    let payload = session
-        .peer
-        .forward_request(session.connection_id, host_connection_id, request)
-        .await?;
+    let payload = session.forward_request(host_connection_id, request).await?;
     response.send(payload)?;
     Ok(())
 }
@@ -2386,7 +2428,7 @@ where
 async fn multi_lsp_query(
     request: MultiLspQuery,
     response: Response<MultiLspQuery>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     tracing::Span::current().record("multi_lsp_query_request", request.request_str());
     tracing::info!("multi_lsp_query message received");
@@ -2396,7 +2438,7 @@ async fn multi_lsp_query(
 /// Notify other participants that a new buffer has been created
 async fn create_buffer_for_peer(
     request: proto::CreateBufferForPeer,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     session
         .db()
@@ -2418,7 +2460,7 @@ async fn create_buffer_for_peer(
 async fn update_buffer(
     request: proto::UpdateBuffer,
     response: Response<proto::UpdateBuffer>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
     let mut capability = Capability::ReadOnly;
@@ -2453,17 +2495,14 @@ async fn update_buffer(
     };
 
     if host != session.connection_id {
-        session
-            .peer
-            .forward_request(session.connection_id, host, request.clone())
-            .await?;
+        session.forward_request(host, request.clone()).await?;
     }
 
     response.send(proto::Ack {})?;
     Ok(())
 }
 
-async fn update_context(message: proto::UpdateContext, session: Session) -> Result<()> {
+async fn update_context(message: proto::UpdateContext, session: MessageContext) -> Result<()> {
     let project_id = ProjectId::from_proto(message.project_id);
 
     let operation = message.operation.as_ref().context("invalid operation")?;
@@ -2508,7 +2547,7 @@ async fn update_context(message: proto::UpdateContext, session: Session) -> Resu
 /// Notify other participants that a project has been updated.
 async fn broadcast_project_message_from_host<T: EntityMessage<Entity = ShareProject>>(
     request: T,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let project_id = ProjectId::from_proto(request.remote_entity_id());
     let project_connection_ids = session
@@ -2533,7 +2572,7 @@ async fn broadcast_project_message_from_host<T: EntityMessage<Entity = ShareProj
 async fn follow(
     request: proto::Follow,
     response: Response<proto::Follow>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let room_id = RoomId::from_proto(request.room_id);
     let project_id = request.project_id.map(ProjectId::from_proto);
@@ -2546,10 +2585,7 @@ async fn follow(
         .check_room_participants(room_id, leader_id, session.connection_id)
         .await?;
 
-    let response_payload = session
-        .peer
-        .forward_request(session.connection_id, leader_id, request)
-        .await?;
+    let response_payload = session.forward_request(leader_id, request).await?;
     response.send(response_payload)?;
 
     if let Some(project_id) = project_id {
@@ -2565,7 +2601,7 @@ async fn follow(
 }
 
 /// Stop following another user in a call.
-async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
+async fn unfollow(request: proto::Unfollow, session: MessageContext) -> Result<()> {
     let room_id = RoomId::from_proto(request.room_id);
     let project_id = request.project_id.map(ProjectId::from_proto);
     let leader_id = request.leader_id.context("invalid leader id")?.into();
@@ -2594,7 +2630,7 @@ async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
 }
 
 /// Notify everyone following you of your current location.
-async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
+async fn update_followers(request: proto::UpdateFollowers, session: MessageContext) -> Result<()> {
     let room_id = RoomId::from_proto(request.room_id);
     let database = session.db.lock().await;
 
@@ -2629,7 +2665,7 @@ async fn update_followers(request: proto::UpdateFollowers, session: Session) ->
 async fn get_users(
     request: proto::GetUsers,
     response: Response<proto::GetUsers>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let user_ids = request
         .user_ids
@@ -2657,7 +2693,7 @@ async fn get_users(
 async fn fuzzy_search_users(
     request: proto::FuzzySearchUsers,
     response: Response<proto::FuzzySearchUsers>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let query = request.query;
     let users = match query.len() {
@@ -2689,7 +2725,7 @@ async fn fuzzy_search_users(
 async fn request_contact(
     request: proto::RequestContact,
     response: Response<proto::RequestContact>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let requester_id = session.user_id();
     let responder_id = UserId::from_proto(request.responder_id);
@@ -2736,7 +2772,7 @@ async fn request_contact(
 async fn respond_to_contact_request(
     request: proto::RespondToContactRequest,
     response: Response<proto::RespondToContactRequest>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let responder_id = session.user_id();
     let requester_id = UserId::from_proto(request.requester_id);
@@ -2794,7 +2830,7 @@ async fn respond_to_contact_request(
 async fn remove_contact(
     request: proto::RemoveContact,
     response: Response<proto::RemoveContact>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let requester_id = session.user_id();
     let responder_id = UserId::from_proto(request.user_id);
@@ -3053,7 +3089,10 @@ async fn update_user_plan(session: &Session) -> Result<()> {
     Ok(())
 }
 
-async fn subscribe_to_channels(_: proto::SubscribeToChannels, session: Session) -> Result<()> {
+async fn subscribe_to_channels(
+    _: proto::SubscribeToChannels,
+    session: MessageContext,
+) -> Result<()> {
     subscribe_user_to_channels(session.user_id(), &session).await?;
     Ok(())
 }
@@ -3079,7 +3118,7 @@ async fn subscribe_user_to_channels(user_id: UserId, session: &Session) -> Resul
 async fn create_channel(
     request: proto::CreateChannel,
     response: Response<proto::CreateChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
 
@@ -3134,7 +3173,7 @@ async fn create_channel(
 async fn delete_channel(
     request: proto::DeleteChannel,
     response: Response<proto::DeleteChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
 
@@ -3162,7 +3201,7 @@ async fn delete_channel(
 async fn invite_channel_member(
     request: proto::InviteChannelMember,
     response: Response<proto::InviteChannelMember>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3199,7 +3238,7 @@ async fn invite_channel_member(
 async fn remove_channel_member(
     request: proto::RemoveChannelMember,
     response: Response<proto::RemoveChannelMember>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3243,7 +3282,7 @@ async fn remove_channel_member(
 async fn set_channel_visibility(
     request: proto::SetChannelVisibility,
     response: Response<proto::SetChannelVisibility>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3288,7 +3327,7 @@ async fn set_channel_visibility(
 async fn set_channel_member_role(
     request: proto::SetChannelMemberRole,
     response: Response<proto::SetChannelMemberRole>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3336,7 +3375,7 @@ async fn set_channel_member_role(
 async fn rename_channel(
     request: proto::RenameChannel,
     response: Response<proto::RenameChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3368,7 +3407,7 @@ async fn rename_channel(
 async fn move_channel(
     request: proto::MoveChannel,
     response: Response<proto::MoveChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let to = ChannelId::from_proto(request.to);
@@ -3410,7 +3449,7 @@ async fn move_channel(
 async fn reorder_channel(
     request: proto::ReorderChannel,
     response: Response<proto::ReorderChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let direction = request.direction();
@@ -3456,7 +3495,7 @@ async fn reorder_channel(
 async fn get_channel_members(
     request: proto::GetChannelMembers,
     response: Response<proto::GetChannelMembers>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3476,7 +3515,7 @@ async fn get_channel_members(
 async fn respond_to_channel_invite(
     request: proto::RespondToChannelInvite,
     response: Response<proto::RespondToChannelInvite>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3517,7 +3556,7 @@ async fn respond_to_channel_invite(
 async fn join_channel(
     request: proto::JoinChannel,
     response: Response<proto::JoinChannel>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     join_channel_internal(channel_id, Box::new(response), session).await
@@ -3540,7 +3579,7 @@ impl JoinChannelInternalResponse for Response<proto::JoinRoom> {
 async fn join_channel_internal(
     channel_id: ChannelId,
     response: Box<impl JoinChannelInternalResponse>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let joined_room = {
         let mut db = session.db().await;
@@ -3635,7 +3674,7 @@ async fn join_channel_internal(
 async fn join_channel_buffer(
     request: proto::JoinChannelBuffer,
     response: Response<proto::JoinChannelBuffer>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3666,7 +3705,7 @@ async fn join_channel_buffer(
 /// Edit the channel notes
 async fn update_channel_buffer(
     request: proto::UpdateChannelBuffer,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3718,7 +3757,7 @@ async fn update_channel_buffer(
 async fn rejoin_channel_buffers(
     request: proto::RejoinChannelBuffers,
     response: Response<proto::RejoinChannelBuffers>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let buffers = db
@@ -3753,7 +3792,7 @@ async fn rejoin_channel_buffers(
 async fn leave_channel_buffer(
     request: proto::LeaveChannelBuffer,
     response: Response<proto::LeaveChannelBuffer>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
@@ -3815,7 +3854,7 @@ fn send_notifications(
 async fn send_channel_message(
     request: proto::SendChannelMessage,
     response: Response<proto::SendChannelMessage>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     // Validate the message body.
     let body = request.body.trim().to_string();
@@ -3908,7 +3947,7 @@ async fn send_channel_message(
 async fn remove_channel_message(
     request: proto::RemoveChannelMessage,
     response: Response<proto::RemoveChannelMessage>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let message_id = MessageId::from_proto(request.message_id);
@@ -3943,7 +3982,7 @@ async fn remove_channel_message(
 async fn update_channel_message(
     request: proto::UpdateChannelMessage,
     response: Response<proto::UpdateChannelMessage>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let message_id = MessageId::from_proto(request.message_id);
@@ -4027,7 +4066,7 @@ async fn update_channel_message(
 /// Mark a channel message as read
 async fn acknowledge_channel_message(
     request: proto::AckChannelMessage,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let message_id = MessageId::from_proto(request.message_id);
@@ -4047,7 +4086,7 @@ async fn acknowledge_channel_message(
 /// Mark a buffer version as synced
 async fn acknowledge_buffer_version(
     request: proto::AckBufferOperation,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let buffer_id = BufferId::from_proto(request.buffer_id);
     session
@@ -4067,7 +4106,7 @@ async fn acknowledge_buffer_version(
 async fn get_supermaven_api_key(
     _request: proto::GetSupermavenApiKey,
     response: Response<proto::GetSupermavenApiKey>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let user_id: String = session.user_id().to_string();
     if !session.is_staff() {
@@ -4096,7 +4135,7 @@ async fn get_supermaven_api_key(
 async fn join_channel_chat(
     request: proto::JoinChannelChat,
     response: Response<proto::JoinChannelChat>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
 
@@ -4114,7 +4153,10 @@ async fn join_channel_chat(
 }
 
 /// Stop receiving chat updates for a channel
-async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
+async fn leave_channel_chat(
+    request: proto::LeaveChannelChat,
+    session: MessageContext,
+) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     session
         .db()
@@ -4128,7 +4170,7 @@ async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session)
 async fn get_channel_messages(
     request: proto::GetChannelMessages,
     response: Response<proto::GetChannelMessages>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let channel_id = ChannelId::from_proto(request.channel_id);
     let messages = session
@@ -4152,7 +4194,7 @@ async fn get_channel_messages(
 async fn get_channel_messages_by_id(
     request: proto::GetChannelMessagesById,
     response: Response<proto::GetChannelMessagesById>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let message_ids = request
         .message_ids
@@ -4175,7 +4217,7 @@ async fn get_channel_messages_by_id(
 async fn get_notifications(
     request: proto::GetNotifications,
     response: Response<proto::GetNotifications>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let notifications = session
         .db()
@@ -4197,7 +4239,7 @@ async fn get_notifications(
 async fn mark_notification_as_read(
     request: proto::MarkNotificationRead,
     response: Response<proto::MarkNotificationRead>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let database = &session.db().await;
     let notifications = database
@@ -4219,7 +4261,7 @@ async fn mark_notification_as_read(
 async fn get_private_user_info(
     _request: proto::GetPrivateUserInfo,
     response: Response<proto::GetPrivateUserInfo>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
 
@@ -4243,7 +4285,7 @@ async fn get_private_user_info(
 async fn accept_terms_of_service(
     _request: proto::AcceptTermsOfService,
     response: Response<proto::AcceptTermsOfService>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
 
@@ -4267,7 +4309,7 @@ async fn accept_terms_of_service(
 async fn get_llm_api_token(
     _request: proto::GetLlmToken,
     response: Response<proto::GetLlmToken>,
-    session: Session,
+    session: MessageContext,
 ) -> Result<()> {
     let db = session.db().await;
 

crates/rpc/src/peer.rs 🔗

@@ -422,23 +422,8 @@ impl Peer {
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
-        let request_start_time = Instant::now();
-        let elapsed_time = move || request_start_time.elapsed().as_millis();
-        tracing::info!("start forwarding request");
         self.request_internal(Some(sender_id), receiver_id, request)
             .map_ok(|envelope| envelope.payload)
-            .inspect_err(move |_| {
-                tracing::error!(
-                    waiting_for_host_ms = elapsed_time(),
-                    "error forwarding request"
-                )
-            })
-            .inspect_ok(move |_| {
-                tracing::info!(
-                    waiting_for_host_ms = elapsed_time(),
-                    "finished forwarding request"
-                )
-            })
     }
 
     fn request_internal<T: RequestMessage>(