rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::Duration,
  55};
  56use tokio::sync::watch;
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98}
  99
 100impl Session {
 101    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 102        #[cfg(test)]
 103        tokio::task::yield_now().await;
 104        let guard = self.db.lock().await;
 105        #[cfg(test)]
 106        tokio::task::yield_now().await;
 107        guard
 108    }
 109
 110    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 111        #[cfg(test)]
 112        tokio::task::yield_now().await;
 113        let guard = self.connection_pool.lock();
 114        ConnectionPoolGuard {
 115            guard,
 116            _not_send: PhantomData,
 117        }
 118    }
 119}
 120
 121impl fmt::Debug for Session {
 122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 123        f.debug_struct("Session")
 124            .field("user_id", &self.user_id)
 125            .field("connection_id", &self.connection_id)
 126            .finish()
 127    }
 128}
 129
 130struct DbHandle(Arc<Database>);
 131
 132impl Deref for DbHandle {
 133    type Target = Database;
 134
 135    fn deref(&self) -> &Self::Target {
 136        self.0.as_ref()
 137    }
 138}
 139
 140pub struct Server {
 141    id: parking_lot::Mutex<ServerId>,
 142    peer: Arc<Peer>,
 143    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 144    app_state: Arc<AppState>,
 145    executor: Executor,
 146    handlers: HashMap<TypeId, MessageHandler>,
 147    teardown: watch::Sender<()>,
 148}
 149
 150pub(crate) struct ConnectionPoolGuard<'a> {
 151    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 152    _not_send: PhantomData<Rc<()>>,
 153}
 154
 155#[derive(Serialize)]
 156pub struct ServerSnapshot<'a> {
 157    peer: &'a Peer,
 158    #[serde(serialize_with = "serialize_deref")]
 159    connection_pool: ConnectionPoolGuard<'a>,
 160}
 161
 162pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 163where
 164    S: Serializer,
 165    T: Deref<Target = U>,
 166    U: Serialize,
 167{
 168    Serialize::serialize(value.deref(), serializer)
 169}
 170
 171impl Server {
 172    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 173        let mut server = Self {
 174            id: parking_lot::Mutex::new(id),
 175            peer: Peer::new(id.0 as u32),
 176            app_state,
 177            executor,
 178            connection_pool: Default::default(),
 179            handlers: Default::default(),
 180            teardown: watch::channel(()).0,
 181        };
 182
 183        server
 184            .add_request_handler(ping)
 185            .add_request_handler(create_room)
 186            .add_request_handler(join_room)
 187            .add_request_handler(rejoin_room)
 188            .add_message_handler(leave_room)
 189            .add_request_handler(call)
 190            .add_request_handler(cancel_call)
 191            .add_message_handler(decline_call)
 192            .add_request_handler(update_participant_location)
 193            .add_request_handler(share_project)
 194            .add_message_handler(unshare_project)
 195            .add_request_handler(join_project)
 196            .add_message_handler(leave_project)
 197            .add_request_handler(update_project)
 198            .add_request_handler(update_worktree)
 199            .add_message_handler(start_language_server)
 200            .add_message_handler(update_language_server)
 201            .add_message_handler(update_diagnostic_summary)
 202            .add_request_handler(forward_project_request::<proto::GetHover>)
 203            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 204            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 205            .add_request_handler(forward_project_request::<proto::GetReferences>)
 206            .add_request_handler(forward_project_request::<proto::SearchProject>)
 207            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 208            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 209            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 210            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 212            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 213            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 214            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 215            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 216            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 217            .add_request_handler(forward_project_request::<proto::PerformRename>)
 218            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 219            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 220            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 221            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 222            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 223            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 224            .add_message_handler(create_buffer_for_peer)
 225            .add_request_handler(update_buffer)
 226            .add_message_handler(update_buffer_file)
 227            .add_message_handler(buffer_reloaded)
 228            .add_message_handler(buffer_saved)
 229            .add_request_handler(save_buffer)
 230            .add_request_handler(get_users)
 231            .add_request_handler(fuzzy_search_users)
 232            .add_request_handler(request_contact)
 233            .add_request_handler(remove_contact)
 234            .add_request_handler(respond_to_contact_request)
 235            .add_request_handler(follow)
 236            .add_message_handler(unfollow)
 237            .add_message_handler(update_followers)
 238            .add_message_handler(update_diff_base)
 239            .add_request_handler(get_private_user_info);
 240
 241        Arc::new(server)
 242    }
 243
 244    pub async fn start(&self) -> Result<()> {
 245        let server_id = *self.id.lock();
 246        let app_state = self.app_state.clone();
 247        let peer = self.peer.clone();
 248        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 249        let pool = self.connection_pool.clone();
 250        let live_kit_client = self.app_state.live_kit_client.clone();
 251
 252        let span = info_span!("start server");
 253        let span_enter = span.enter();
 254
 255        tracing::info!("begin deleting stale projects");
 256        app_state
 257            .db
 258            .delete_stale_projects(&app_state.config.zed_environment, server_id)
 259            .await?;
 260        tracing::info!("finish deleting stale projects");
 261
 262        drop(span_enter);
 263        self.executor.spawn_detached(
 264            async move {
 265                tracing::info!("waiting for cleanup timeout");
 266                timeout.await;
 267                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 268                if let Some(room_ids) = app_state
 269                    .db
 270                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 271                    .await
 272                    .trace_err()
 273                {
 274                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 275                    for room_id in room_ids {
 276                        let mut contacts_to_update = HashSet::default();
 277                        let mut canceled_calls_to_user_ids = Vec::new();
 278                        let mut live_kit_room = String::new();
 279                        let mut delete_live_kit_room = false;
 280
 281                        if let Ok(mut refreshed_room) =
 282                            app_state.db.refresh_room(room_id, server_id).await
 283                        {
 284                            tracing::info!(
 285                                room_id = room_id.0,
 286                                new_participant_count = refreshed_room.room.participants.len(),
 287                                "refreshed room"
 288                            );
 289                            room_updated(&refreshed_room.room, &peer);
 290                            contacts_to_update
 291                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 292                            contacts_to_update
 293                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 294                            canceled_calls_to_user_ids =
 295                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 296                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 297                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 298                        }
 299
 300                        {
 301                            let pool = pool.lock();
 302                            for canceled_user_id in canceled_calls_to_user_ids {
 303                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 304                                    peer.send(
 305                                        connection_id,
 306                                        proto::CallCanceled {
 307                                            room_id: room_id.to_proto(),
 308                                        },
 309                                    )
 310                                    .trace_err();
 311                                }
 312                            }
 313                        }
 314
 315                        for user_id in contacts_to_update {
 316                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 317                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 318                            if let Some((busy, contacts)) = busy.zip(contacts) {
 319                                let pool = pool.lock();
 320                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 321                                for contact in contacts {
 322                                    if let db::Contact::Accepted {
 323                                        user_id: contact_user_id,
 324                                        ..
 325                                    } = contact
 326                                    {
 327                                        for contact_conn_id in
 328                                            pool.user_connection_ids(contact_user_id)
 329                                        {
 330                                            peer.send(
 331                                                contact_conn_id,
 332                                                proto::UpdateContacts {
 333                                                    contacts: vec![updated_contact.clone()],
 334                                                    remove_contacts: Default::default(),
 335                                                    incoming_requests: Default::default(),
 336                                                    remove_incoming_requests: Default::default(),
 337                                                    outgoing_requests: Default::default(),
 338                                                    remove_outgoing_requests: Default::default(),
 339                                                },
 340                                            )
 341                                            .trace_err();
 342                                        }
 343                                    }
 344                                }
 345                            }
 346                        }
 347
 348                        if let Some(live_kit) = live_kit_client.as_ref() {
 349                            if delete_live_kit_room {
 350                                live_kit.delete_room(live_kit_room).await.trace_err();
 351                            }
 352                        }
 353                    }
 354                }
 355
 356                app_state
 357                    .db
 358                    .delete_stale_servers(server_id, &app_state.config.zed_environment)
 359                    .await
 360                    .trace_err();
 361            }
 362            .instrument(span),
 363        );
 364        Ok(())
 365    }
 366
 367    pub fn teardown(&self) {
 368        self.peer.teardown();
 369        self.connection_pool.lock().reset();
 370        let _ = self.teardown.send(());
 371    }
 372
 373    #[cfg(test)]
 374    pub fn reset(&self, id: ServerId) {
 375        self.teardown();
 376        *self.id.lock() = id;
 377        self.peer.reset(id.0 as u32);
 378    }
 379
 380    #[cfg(test)]
 381    pub fn id(&self) -> ServerId {
 382        *self.id.lock()
 383    }
 384
 385    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 386    where
 387        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 388        Fut: 'static + Send + Future<Output = Result<()>>,
 389        M: EnvelopedMessage,
 390    {
 391        let prev_handler = self.handlers.insert(
 392            TypeId::of::<M>(),
 393            Box::new(move |envelope, session| {
 394                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 395                let span = info_span!(
 396                    "handle message",
 397                    payload_type = envelope.payload_type_name()
 398                );
 399                span.in_scope(|| {
 400                    tracing::info!(
 401                        payload_type = envelope.payload_type_name(),
 402                        "message received"
 403                    );
 404                });
 405                let future = (handler)(*envelope, session);
 406                async move {
 407                    if let Err(error) = future.await {
 408                        tracing::error!(%error, "error handling message");
 409                    }
 410                }
 411                .instrument(span)
 412                .boxed()
 413            }),
 414        );
 415        if prev_handler.is_some() {
 416            panic!("registered a handler for the same message twice");
 417        }
 418        self
 419    }
 420
 421    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 422    where
 423        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 424        Fut: 'static + Send + Future<Output = Result<()>>,
 425        M: EnvelopedMessage,
 426    {
 427        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 428        self
 429    }
 430
 431    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 432    where
 433        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 434        Fut: Send + Future<Output = Result<()>>,
 435        M: RequestMessage,
 436    {
 437        let handler = Arc::new(handler);
 438        self.add_handler(move |envelope, session| {
 439            let receipt = envelope.receipt();
 440            let handler = handler.clone();
 441            async move {
 442                let peer = session.peer.clone();
 443                let responded = Arc::new(AtomicBool::default());
 444                let response = Response {
 445                    peer: peer.clone(),
 446                    responded: responded.clone(),
 447                    receipt,
 448                };
 449                match (handler)(envelope.payload, response, session).await {
 450                    Ok(()) => {
 451                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 452                            Ok(())
 453                        } else {
 454                            Err(anyhow!("handler did not send a response"))?
 455                        }
 456                    }
 457                    Err(error) => {
 458                        peer.respond_with_error(
 459                            receipt,
 460                            proto::Error {
 461                                message: error.to_string(),
 462                            },
 463                        )?;
 464                        Err(error)
 465                    }
 466                }
 467            }
 468        })
 469    }
 470
 471    pub fn handle_connection(
 472        self: &Arc<Self>,
 473        connection: Connection,
 474        address: String,
 475        user: User,
 476        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 477        executor: Executor,
 478    ) -> impl Future<Output = Result<()>> {
 479        let this = self.clone();
 480        let user_id = user.id;
 481        let login = user.github_login;
 482        let span = info_span!("handle connection", %user_id, %login, %address);
 483        let mut teardown = self.teardown.subscribe();
 484        async move {
 485            let (connection_id, handle_io, mut incoming_rx) = this
 486                .peer
 487                .add_connection(connection, {
 488                    let executor = executor.clone();
 489                    move |duration| executor.sleep(duration)
 490                });
 491
 492            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 493            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 494            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 495
 496            if let Some(send_connection_id) = send_connection_id.take() {
 497                let _ = send_connection_id.send(connection_id);
 498            }
 499
 500            if !user.connected_once {
 501                this.peer.send(connection_id, proto::ShowContacts {})?;
 502                this.app_state.db.set_user_connected_once(user_id, true).await?;
 503            }
 504
 505            let (contacts, invite_code) = future::try_join(
 506                this.app_state.db.get_contacts(user_id),
 507                this.app_state.db.get_invite_code_for_user(user_id)
 508            ).await?;
 509
 510            {
 511                let mut pool = this.connection_pool.lock();
 512                pool.add_connection(connection_id, user_id, user.admin);
 513                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 514
 515                if let Some((code, count)) = invite_code {
 516                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 517                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 518                        count: count as u32,
 519                    })?;
 520                }
 521            }
 522
 523            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 524                this.peer.send(connection_id, incoming_call)?;
 525            }
 526
 527            let session = Session {
 528                user_id,
 529                connection_id,
 530                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 531                peer: this.peer.clone(),
 532                connection_pool: this.connection_pool.clone(),
 533                live_kit_client: this.app_state.live_kit_client.clone()
 534            };
 535            update_user_contacts(user_id, &session).await?;
 536
 537            let handle_io = handle_io.fuse();
 538            futures::pin_mut!(handle_io);
 539
 540            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 541            // This prevents deadlocks when e.g., client A performs a request to client B and
 542            // client B performs a request to client A. If both clients stop processing further
 543            // messages until their respective request completes, they won't have a chance to
 544            // respond to the other client's request and cause a deadlock.
 545            //
 546            // This arrangement ensures we will attempt to process earlier messages first, but fall
 547            // back to processing messages arrived later in the spirit of making progress.
 548            let mut foreground_message_handlers = FuturesUnordered::new();
 549            loop {
 550                let next_message = incoming_rx.next().fuse();
 551                futures::pin_mut!(next_message);
 552                futures::select_biased! {
 553                    _ = teardown.changed().fuse() => return Ok(()),
 554                    result = handle_io => {
 555                        if let Err(error) = result {
 556                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 557                        }
 558                        break;
 559                    }
 560                    _ = foreground_message_handlers.next() => {}
 561                    message = next_message => {
 562                        if let Some(message) = message {
 563                            let type_name = message.payload_type_name();
 564                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 565                            let span_enter = span.enter();
 566                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 567                                let is_background = message.is_background();
 568                                let handle_message = (handler)(message, session.clone());
 569                                drop(span_enter);
 570
 571                                let handle_message = handle_message.instrument(span);
 572                                if is_background {
 573                                    executor.spawn_detached(handle_message);
 574                                } else {
 575                                    foreground_message_handlers.push(handle_message);
 576                                }
 577                            } else {
 578                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 579                            }
 580                        } else {
 581                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 582                            break;
 583                        }
 584                    }
 585                }
 586            }
 587
 588            drop(foreground_message_handlers);
 589            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 590            if let Err(error) = sign_out(session, teardown, executor).await {
 591                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 592            }
 593
 594            Ok(())
 595        }.instrument(span)
 596    }
 597
 598    pub async fn invite_code_redeemed(
 599        self: &Arc<Self>,
 600        inviter_id: UserId,
 601        invitee_id: UserId,
 602    ) -> Result<()> {
 603        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 604            if let Some(code) = &user.invite_code {
 605                let pool = self.connection_pool.lock();
 606                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 607                for connection_id in pool.user_connection_ids(inviter_id) {
 608                    self.peer.send(
 609                        connection_id,
 610                        proto::UpdateContacts {
 611                            contacts: vec![invitee_contact.clone()],
 612                            ..Default::default()
 613                        },
 614                    )?;
 615                    self.peer.send(
 616                        connection_id,
 617                        proto::UpdateInviteInfo {
 618                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 619                            count: user.invite_count as u32,
 620                        },
 621                    )?;
 622                }
 623            }
 624        }
 625        Ok(())
 626    }
 627
 628    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 629        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 630            if let Some(invite_code) = &user.invite_code {
 631                let pool = self.connection_pool.lock();
 632                for connection_id in pool.user_connection_ids(user_id) {
 633                    self.peer.send(
 634                        connection_id,
 635                        proto::UpdateInviteInfo {
 636                            url: format!(
 637                                "{}{}",
 638                                self.app_state.config.invite_link_prefix, invite_code
 639                            ),
 640                            count: user.invite_count as u32,
 641                        },
 642                    )?;
 643                }
 644            }
 645        }
 646        Ok(())
 647    }
 648
 649    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 650        ServerSnapshot {
 651            connection_pool: ConnectionPoolGuard {
 652                guard: self.connection_pool.lock(),
 653                _not_send: PhantomData,
 654            },
 655            peer: &self.peer,
 656        }
 657    }
 658}
 659
 660impl<'a> Deref for ConnectionPoolGuard<'a> {
 661    type Target = ConnectionPool;
 662
 663    fn deref(&self) -> &Self::Target {
 664        &*self.guard
 665    }
 666}
 667
 668impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 669    fn deref_mut(&mut self) -> &mut Self::Target {
 670        &mut *self.guard
 671    }
 672}
 673
 674impl<'a> Drop for ConnectionPoolGuard<'a> {
 675    fn drop(&mut self) {
 676        #[cfg(test)]
 677        self.check_invariants();
 678    }
 679}
 680
 681fn broadcast<F>(
 682    sender_id: ConnectionId,
 683    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 684    mut f: F,
 685) where
 686    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 687{
 688    for receiver_id in receiver_ids {
 689        if receiver_id != sender_id {
 690            f(receiver_id).trace_err();
 691        }
 692    }
 693}
 694
 695lazy_static! {
 696    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 697}
 698
 699pub struct ProtocolVersion(u32);
 700
 701impl Header for ProtocolVersion {
 702    fn name() -> &'static HeaderName {
 703        &ZED_PROTOCOL_VERSION
 704    }
 705
 706    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 707    where
 708        Self: Sized,
 709        I: Iterator<Item = &'i axum::http::HeaderValue>,
 710    {
 711        let version = values
 712            .next()
 713            .ok_or_else(axum::headers::Error::invalid)?
 714            .to_str()
 715            .map_err(|_| axum::headers::Error::invalid())?
 716            .parse()
 717            .map_err(|_| axum::headers::Error::invalid())?;
 718        Ok(Self(version))
 719    }
 720
 721    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 722        values.extend([self.0.to_string().parse().unwrap()]);
 723    }
 724}
 725
 726pub fn routes(server: Arc<Server>) -> Router<Body> {
 727    Router::new()
 728        .route("/rpc", get(handle_websocket_request))
 729        .layer(
 730            ServiceBuilder::new()
 731                .layer(Extension(server.app_state.clone()))
 732                .layer(middleware::from_fn(auth::validate_header)),
 733        )
 734        .route("/metrics", get(handle_metrics))
 735        .layer(Extension(server))
 736}
 737
 738pub async fn handle_websocket_request(
 739    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 740    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 741    Extension(server): Extension<Arc<Server>>,
 742    Extension(user): Extension<User>,
 743    ws: WebSocketUpgrade,
 744) -> axum::response::Response {
 745    if protocol_version != rpc::PROTOCOL_VERSION {
 746        return (
 747            StatusCode::UPGRADE_REQUIRED,
 748            "client must be upgraded".to_string(),
 749        )
 750            .into_response();
 751    }
 752    let socket_address = socket_address.to_string();
 753    ws.on_upgrade(move |socket| {
 754        use util::ResultExt;
 755        let socket = socket
 756            .map_ok(to_tungstenite_message)
 757            .err_into()
 758            .with(|message| async move { Ok(to_axum_message(message)) });
 759        let connection = Connection::new(Box::pin(socket));
 760        async move {
 761            server
 762                .handle_connection(connection, socket_address, user, None, Executor::Production)
 763                .await
 764                .log_err();
 765        }
 766    })
 767}
 768
 769pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 770    let connections = server
 771        .connection_pool
 772        .lock()
 773        .connections()
 774        .filter(|connection| !connection.admin)
 775        .count();
 776
 777    METRIC_CONNECTIONS.set(connections as _);
 778
 779    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 780    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 781
 782    let encoder = prometheus::TextEncoder::new();
 783    let metric_families = prometheus::gather();
 784    let encoded_metrics = encoder
 785        .encode_to_string(&metric_families)
 786        .map_err(|err| anyhow!("{}", err))?;
 787    Ok(encoded_metrics)
 788}
 789
 790#[instrument(err, skip(executor))]
 791async fn sign_out(
 792    session: Session,
 793    mut teardown: watch::Receiver<()>,
 794    executor: Executor,
 795) -> Result<()> {
 796    session.peer.disconnect(session.connection_id);
 797    session
 798        .connection_pool()
 799        .await
 800        .remove_connection(session.connection_id)?;
 801
 802    session
 803        .db()
 804        .await
 805        .connection_lost(session.connection_id)
 806        .await
 807        .trace_err();
 808
 809    futures::select_biased! {
 810        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 811            leave_room_for_session(&session).await.trace_err();
 812
 813            if !session
 814                .connection_pool()
 815                .await
 816                .is_user_online(session.user_id)
 817            {
 818                let db = session.db().await;
 819                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 820                    room_updated(&room, &session.peer);
 821                }
 822            }
 823            update_user_contacts(session.user_id, &session).await?;
 824        }
 825        _ = teardown.changed().fuse() => {}
 826    }
 827
 828    Ok(())
 829}
 830
 831async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 832    response.send(proto::Ack {})?;
 833    Ok(())
 834}
 835
 836async fn create_room(
 837    _request: proto::CreateRoom,
 838    response: Response<proto::CreateRoom>,
 839    session: Session,
 840) -> Result<()> {
 841    let live_kit_room = nanoid::nanoid!(30);
 842    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 843        if let Some(_) = live_kit
 844            .create_room(live_kit_room.clone())
 845            .await
 846            .trace_err()
 847        {
 848            if let Some(token) = live_kit
 849                .room_token(&live_kit_room, &session.user_id.to_string())
 850                .trace_err()
 851            {
 852                Some(proto::LiveKitConnectionInfo {
 853                    server_url: live_kit.url().into(),
 854                    token,
 855                })
 856            } else {
 857                None
 858            }
 859        } else {
 860            None
 861        }
 862    } else {
 863        None
 864    };
 865
 866    {
 867        let room = session
 868            .db()
 869            .await
 870            .create_room(session.user_id, session.connection_id, &live_kit_room)
 871            .await?;
 872
 873        response.send(proto::CreateRoomResponse {
 874            room: Some(room.clone()),
 875            live_kit_connection_info,
 876        })?;
 877    }
 878
 879    update_user_contacts(session.user_id, &session).await?;
 880    Ok(())
 881}
 882
 883async fn join_room(
 884    request: proto::JoinRoom,
 885    response: Response<proto::JoinRoom>,
 886    session: Session,
 887) -> Result<()> {
 888    let room_id = RoomId::from_proto(request.id);
 889    let room = {
 890        let room = session
 891            .db()
 892            .await
 893            .join_room(room_id, session.user_id, session.connection_id)
 894            .await?;
 895        room_updated(&room, &session.peer);
 896        room.clone()
 897    };
 898
 899    for connection_id in session
 900        .connection_pool()
 901        .await
 902        .user_connection_ids(session.user_id)
 903    {
 904        session
 905            .peer
 906            .send(
 907                connection_id,
 908                proto::CallCanceled {
 909                    room_id: room_id.to_proto(),
 910                },
 911            )
 912            .trace_err();
 913    }
 914
 915    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 916        if let Some(token) = live_kit
 917            .room_token(&room.live_kit_room, &session.user_id.to_string())
 918            .trace_err()
 919        {
 920            Some(proto::LiveKitConnectionInfo {
 921                server_url: live_kit.url().into(),
 922                token,
 923            })
 924        } else {
 925            None
 926        }
 927    } else {
 928        None
 929    };
 930
 931    response.send(proto::JoinRoomResponse {
 932        room: Some(room),
 933        live_kit_connection_info,
 934    })?;
 935
 936    update_user_contacts(session.user_id, &session).await?;
 937    Ok(())
 938}
 939
 940async fn rejoin_room(
 941    request: proto::RejoinRoom,
 942    response: Response<proto::RejoinRoom>,
 943    session: Session,
 944) -> Result<()> {
 945    let mut rejoined_room = session
 946        .db()
 947        .await
 948        .rejoin_room(request, session.user_id, session.connection_id)
 949        .await?;
 950
 951    response.send(proto::RejoinRoomResponse {
 952        room: Some(rejoined_room.room.clone()),
 953        reshared_projects: rejoined_room
 954            .reshared_projects
 955            .iter()
 956            .map(|project| proto::ResharedProject {
 957                id: project.id.to_proto(),
 958                collaborators: project
 959                    .collaborators
 960                    .iter()
 961                    .map(|collaborator| collaborator.to_proto())
 962                    .collect(),
 963            })
 964            .collect(),
 965        rejoined_projects: rejoined_room
 966            .rejoined_projects
 967            .iter()
 968            .map(|rejoined_project| proto::RejoinedProject {
 969                id: rejoined_project.id.to_proto(),
 970                worktrees: rejoined_project
 971                    .worktrees
 972                    .iter()
 973                    .map(|worktree| proto::WorktreeMetadata {
 974                        id: worktree.id,
 975                        root_name: worktree.root_name.clone(),
 976                        visible: worktree.visible,
 977                        abs_path: worktree.abs_path.clone(),
 978                    })
 979                    .collect(),
 980                collaborators: rejoined_project
 981                    .collaborators
 982                    .iter()
 983                    .map(|collaborator| collaborator.to_proto())
 984                    .collect(),
 985                language_servers: rejoined_project.language_servers.clone(),
 986            })
 987            .collect(),
 988    })?;
 989    room_updated(&rejoined_room.room, &session.peer);
 990
 991    // Notify other participants about this peer's reconnection to projects.
 992    for project in &rejoined_room.reshared_projects {
 993        for collaborator in &project.collaborators {
 994            if collaborator.connection_id != session.connection_id {
 995                session
 996                    .peer
 997                    .send(
 998                        collaborator.connection_id,
 999                        proto::UpdateProjectCollaborator {
1000                            project_id: project.id.to_proto(),
1001                            old_peer_id: Some(project.old_connection_id.into()),
1002                            new_peer_id: Some(session.connection_id.into()),
1003                        },
1004                    )
1005                    .trace_err();
1006            }
1007        }
1008    }
1009    for project in &rejoined_room.rejoined_projects {
1010        for collaborator in &project.collaborators {
1011            if collaborator.connection_id != session.connection_id {
1012                session
1013                    .peer
1014                    .send(
1015                        collaborator.connection_id,
1016                        proto::UpdateProjectCollaborator {
1017                            project_id: project.id.to_proto(),
1018                            old_peer_id: Some(project.old_connection_id.into()),
1019                            new_peer_id: Some(session.connection_id.into()),
1020                        },
1021                    )
1022                    .trace_err();
1023            }
1024        }
1025    }
1026
1027    for project in &mut rejoined_room.rejoined_projects {
1028        for worktree in mem::take(&mut project.worktrees) {
1029            #[cfg(any(test, feature = "test-support"))]
1030            const MAX_CHUNK_SIZE: usize = 2;
1031            #[cfg(not(any(test, feature = "test-support")))]
1032            const MAX_CHUNK_SIZE: usize = 256;
1033
1034            // Stream this worktree's entries.
1035            let message = proto::UpdateWorktree {
1036                project_id: project.id.to_proto(),
1037                worktree_id: worktree.id,
1038                abs_path: worktree.abs_path.clone(),
1039                root_name: worktree.root_name,
1040                updated_entries: worktree.updated_entries,
1041                removed_entries: worktree.removed_entries,
1042                scan_id: worktree.scan_id,
1043                is_last_update: worktree.is_complete,
1044            };
1045            for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1046                session.peer.send(session.connection_id, update.clone())?;
1047            }
1048
1049            // Stream this worktree's diagnostics.
1050            for summary in worktree.diagnostic_summaries {
1051                session.peer.send(
1052                    session.connection_id,
1053                    proto::UpdateDiagnosticSummary {
1054                        project_id: project.id.to_proto(),
1055                        worktree_id: worktree.id,
1056                        summary: Some(summary),
1057                    },
1058                )?;
1059            }
1060        }
1061
1062        for language_server in &project.language_servers {
1063            session.peer.send(
1064                session.connection_id,
1065                proto::UpdateLanguageServer {
1066                    project_id: project.id.to_proto(),
1067                    language_server_id: language_server.id,
1068                    variant: Some(
1069                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1070                            proto::LspDiskBasedDiagnosticsUpdated {},
1071                        ),
1072                    ),
1073                },
1074            )?;
1075        }
1076    }
1077
1078    update_user_contacts(session.user_id, &session).await?;
1079    Ok(())
1080}
1081
1082async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
1083    leave_room_for_session(&session).await
1084}
1085
1086async fn call(
1087    request: proto::Call,
1088    response: Response<proto::Call>,
1089    session: Session,
1090) -> Result<()> {
1091    let room_id = RoomId::from_proto(request.room_id);
1092    let calling_user_id = session.user_id;
1093    let calling_connection_id = session.connection_id;
1094    let called_user_id = UserId::from_proto(request.called_user_id);
1095    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1096    if !session
1097        .db()
1098        .await
1099        .has_contact(calling_user_id, called_user_id)
1100        .await?
1101    {
1102        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1103    }
1104
1105    let incoming_call = {
1106        let (room, incoming_call) = &mut *session
1107            .db()
1108            .await
1109            .call(
1110                room_id,
1111                calling_user_id,
1112                calling_connection_id,
1113                called_user_id,
1114                initial_project_id,
1115            )
1116            .await?;
1117        room_updated(&room, &session.peer);
1118        mem::take(incoming_call)
1119    };
1120    update_user_contacts(called_user_id, &session).await?;
1121
1122    let mut calls = session
1123        .connection_pool()
1124        .await
1125        .user_connection_ids(called_user_id)
1126        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1127        .collect::<FuturesUnordered<_>>();
1128
1129    while let Some(call_response) = calls.next().await {
1130        match call_response.as_ref() {
1131            Ok(_) => {
1132                response.send(proto::Ack {})?;
1133                return Ok(());
1134            }
1135            Err(_) => {
1136                call_response.trace_err();
1137            }
1138        }
1139    }
1140
1141    {
1142        let room = session
1143            .db()
1144            .await
1145            .call_failed(room_id, called_user_id)
1146            .await?;
1147        room_updated(&room, &session.peer);
1148    }
1149    update_user_contacts(called_user_id, &session).await?;
1150
1151    Err(anyhow!("failed to ring user"))?
1152}
1153
1154async fn cancel_call(
1155    request: proto::CancelCall,
1156    response: Response<proto::CancelCall>,
1157    session: Session,
1158) -> Result<()> {
1159    let called_user_id = UserId::from_proto(request.called_user_id);
1160    let room_id = RoomId::from_proto(request.room_id);
1161    {
1162        let room = session
1163            .db()
1164            .await
1165            .cancel_call(room_id, session.connection_id, called_user_id)
1166            .await?;
1167        room_updated(&room, &session.peer);
1168    }
1169
1170    for connection_id in session
1171        .connection_pool()
1172        .await
1173        .user_connection_ids(called_user_id)
1174    {
1175        session
1176            .peer
1177            .send(
1178                connection_id,
1179                proto::CallCanceled {
1180                    room_id: room_id.to_proto(),
1181                },
1182            )
1183            .trace_err();
1184    }
1185    response.send(proto::Ack {})?;
1186
1187    update_user_contacts(called_user_id, &session).await?;
1188    Ok(())
1189}
1190
1191async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1192    let room_id = RoomId::from_proto(message.room_id);
1193    {
1194        let room = session
1195            .db()
1196            .await
1197            .decline_call(Some(room_id), session.user_id)
1198            .await?
1199            .ok_or_else(|| anyhow!("failed to decline call"))?;
1200        room_updated(&room, &session.peer);
1201    }
1202
1203    for connection_id in session
1204        .connection_pool()
1205        .await
1206        .user_connection_ids(session.user_id)
1207    {
1208        session
1209            .peer
1210            .send(
1211                connection_id,
1212                proto::CallCanceled {
1213                    room_id: room_id.to_proto(),
1214                },
1215            )
1216            .trace_err();
1217    }
1218    update_user_contacts(session.user_id, &session).await?;
1219    Ok(())
1220}
1221
1222async fn update_participant_location(
1223    request: proto::UpdateParticipantLocation,
1224    response: Response<proto::UpdateParticipantLocation>,
1225    session: Session,
1226) -> Result<()> {
1227    let room_id = RoomId::from_proto(request.room_id);
1228    let location = request
1229        .location
1230        .ok_or_else(|| anyhow!("invalid location"))?;
1231    let room = session
1232        .db()
1233        .await
1234        .update_room_participant_location(room_id, session.connection_id, location)
1235        .await?;
1236    room_updated(&room, &session.peer);
1237    response.send(proto::Ack {})?;
1238    Ok(())
1239}
1240
1241async fn share_project(
1242    request: proto::ShareProject,
1243    response: Response<proto::ShareProject>,
1244    session: Session,
1245) -> Result<()> {
1246    let (project_id, room) = &*session
1247        .db()
1248        .await
1249        .share_project(
1250            RoomId::from_proto(request.room_id),
1251            session.connection_id,
1252            &request.worktrees,
1253        )
1254        .await?;
1255    response.send(proto::ShareProjectResponse {
1256        project_id: project_id.to_proto(),
1257    })?;
1258    room_updated(&room, &session.peer);
1259
1260    Ok(())
1261}
1262
1263async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1264    let project_id = ProjectId::from_proto(message.project_id);
1265
1266    let (room, guest_connection_ids) = &*session
1267        .db()
1268        .await
1269        .unshare_project(project_id, session.connection_id)
1270        .await?;
1271
1272    broadcast(
1273        session.connection_id,
1274        guest_connection_ids.iter().copied(),
1275        |conn_id| session.peer.send(conn_id, message.clone()),
1276    );
1277    room_updated(&room, &session.peer);
1278
1279    Ok(())
1280}
1281
1282async fn join_project(
1283    request: proto::JoinProject,
1284    response: Response<proto::JoinProject>,
1285    session: Session,
1286) -> Result<()> {
1287    let project_id = ProjectId::from_proto(request.project_id);
1288    let guest_user_id = session.user_id;
1289
1290    tracing::info!(%project_id, "join project");
1291
1292    let (project, replica_id) = &mut *session
1293        .db()
1294        .await
1295        .join_project(project_id, session.connection_id)
1296        .await?;
1297
1298    let collaborators = project
1299        .collaborators
1300        .iter()
1301        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1302        .map(|collaborator| collaborator.to_proto())
1303        .collect::<Vec<_>>();
1304    let worktrees = project
1305        .worktrees
1306        .iter()
1307        .map(|(id, worktree)| proto::WorktreeMetadata {
1308            id: *id,
1309            root_name: worktree.root_name.clone(),
1310            visible: worktree.visible,
1311            abs_path: worktree.abs_path.clone(),
1312        })
1313        .collect::<Vec<_>>();
1314
1315    for collaborator in &collaborators {
1316        session
1317            .peer
1318            .send(
1319                collaborator.peer_id.unwrap().into(),
1320                proto::AddProjectCollaborator {
1321                    project_id: project_id.to_proto(),
1322                    collaborator: Some(proto::Collaborator {
1323                        peer_id: Some(session.connection_id.into()),
1324                        replica_id: replica_id.0 as u32,
1325                        user_id: guest_user_id.to_proto(),
1326                    }),
1327                },
1328            )
1329            .trace_err();
1330    }
1331
1332    // First, we send the metadata associated with each worktree.
1333    response.send(proto::JoinProjectResponse {
1334        worktrees: worktrees.clone(),
1335        replica_id: replica_id.0 as u32,
1336        collaborators: collaborators.clone(),
1337        language_servers: project.language_servers.clone(),
1338    })?;
1339
1340    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1341        #[cfg(any(test, feature = "test-support"))]
1342        const MAX_CHUNK_SIZE: usize = 2;
1343        #[cfg(not(any(test, feature = "test-support")))]
1344        const MAX_CHUNK_SIZE: usize = 256;
1345
1346        // Stream this worktree's entries.
1347        let message = proto::UpdateWorktree {
1348            project_id: project_id.to_proto(),
1349            worktree_id,
1350            abs_path: worktree.abs_path.clone(),
1351            root_name: worktree.root_name,
1352            updated_entries: worktree.entries,
1353            removed_entries: Default::default(),
1354            scan_id: worktree.scan_id,
1355            is_last_update: worktree.is_complete,
1356        };
1357        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1358            session.peer.send(session.connection_id, update.clone())?;
1359        }
1360
1361        // Stream this worktree's diagnostics.
1362        for summary in worktree.diagnostic_summaries {
1363            session.peer.send(
1364                session.connection_id,
1365                proto::UpdateDiagnosticSummary {
1366                    project_id: project_id.to_proto(),
1367                    worktree_id: worktree.id,
1368                    summary: Some(summary),
1369                },
1370            )?;
1371        }
1372    }
1373
1374    for language_server in &project.language_servers {
1375        session.peer.send(
1376            session.connection_id,
1377            proto::UpdateLanguageServer {
1378                project_id: project_id.to_proto(),
1379                language_server_id: language_server.id,
1380                variant: Some(
1381                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1382                        proto::LspDiskBasedDiagnosticsUpdated {},
1383                    ),
1384                ),
1385            },
1386        )?;
1387    }
1388
1389    Ok(())
1390}
1391
1392async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1393    let sender_id = session.connection_id;
1394    let project_id = ProjectId::from_proto(request.project_id);
1395
1396    let project = session
1397        .db()
1398        .await
1399        .leave_project(project_id, sender_id)
1400        .await?;
1401    tracing::info!(
1402        %project_id,
1403        host_user_id = %project.host_user_id,
1404        host_connection_id = %project.host_connection_id,
1405        "leave project"
1406    );
1407    project_left(&project, &session);
1408
1409    Ok(())
1410}
1411
1412async fn update_project(
1413    request: proto::UpdateProject,
1414    response: Response<proto::UpdateProject>,
1415    session: Session,
1416) -> Result<()> {
1417    let project_id = ProjectId::from_proto(request.project_id);
1418    let (room, guest_connection_ids) = &*session
1419        .db()
1420        .await
1421        .update_project(project_id, session.connection_id, &request.worktrees)
1422        .await?;
1423    broadcast(
1424        session.connection_id,
1425        guest_connection_ids.iter().copied(),
1426        |connection_id| {
1427            session
1428                .peer
1429                .forward_send(session.connection_id, connection_id, request.clone())
1430        },
1431    );
1432    room_updated(&room, &session.peer);
1433    response.send(proto::Ack {})?;
1434
1435    Ok(())
1436}
1437
1438async fn update_worktree(
1439    request: proto::UpdateWorktree,
1440    response: Response<proto::UpdateWorktree>,
1441    session: Session,
1442) -> Result<()> {
1443    let guest_connection_ids = session
1444        .db()
1445        .await
1446        .update_worktree(&request, session.connection_id)
1447        .await?;
1448
1449    broadcast(
1450        session.connection_id,
1451        guest_connection_ids.iter().copied(),
1452        |connection_id| {
1453            session
1454                .peer
1455                .forward_send(session.connection_id, connection_id, request.clone())
1456        },
1457    );
1458    response.send(proto::Ack {})?;
1459    Ok(())
1460}
1461
1462async fn update_diagnostic_summary(
1463    message: proto::UpdateDiagnosticSummary,
1464    session: Session,
1465) -> Result<()> {
1466    let guest_connection_ids = session
1467        .db()
1468        .await
1469        .update_diagnostic_summary(&message, session.connection_id)
1470        .await?;
1471
1472    broadcast(
1473        session.connection_id,
1474        guest_connection_ids.iter().copied(),
1475        |connection_id| {
1476            session
1477                .peer
1478                .forward_send(session.connection_id, connection_id, message.clone())
1479        },
1480    );
1481
1482    Ok(())
1483}
1484
1485async fn start_language_server(
1486    request: proto::StartLanguageServer,
1487    session: Session,
1488) -> Result<()> {
1489    let guest_connection_ids = session
1490        .db()
1491        .await
1492        .start_language_server(&request, session.connection_id)
1493        .await?;
1494
1495    broadcast(
1496        session.connection_id,
1497        guest_connection_ids.iter().copied(),
1498        |connection_id| {
1499            session
1500                .peer
1501                .forward_send(session.connection_id, connection_id, request.clone())
1502        },
1503    );
1504    Ok(())
1505}
1506
1507async fn update_language_server(
1508    request: proto::UpdateLanguageServer,
1509    session: Session,
1510) -> Result<()> {
1511    let project_id = ProjectId::from_proto(request.project_id);
1512    let project_connection_ids = session
1513        .db()
1514        .await
1515        .project_connection_ids(project_id, session.connection_id)
1516        .await?;
1517    broadcast(
1518        session.connection_id,
1519        project_connection_ids.iter().copied(),
1520        |connection_id| {
1521            session
1522                .peer
1523                .forward_send(session.connection_id, connection_id, request.clone())
1524        },
1525    );
1526    Ok(())
1527}
1528
1529async fn forward_project_request<T>(
1530    request: T,
1531    response: Response<T>,
1532    session: Session,
1533) -> Result<()>
1534where
1535    T: EntityMessage + RequestMessage,
1536{
1537    let project_id = ProjectId::from_proto(request.remote_entity_id());
1538    let host_connection_id = {
1539        let collaborators = session
1540            .db()
1541            .await
1542            .project_collaborators(project_id, session.connection_id)
1543            .await?;
1544        collaborators
1545            .iter()
1546            .find(|collaborator| collaborator.is_host)
1547            .ok_or_else(|| anyhow!("host not found"))?
1548            .connection_id
1549    };
1550
1551    let payload = session
1552        .peer
1553        .forward_request(session.connection_id, host_connection_id, request)
1554        .await?;
1555
1556    response.send(payload)?;
1557    Ok(())
1558}
1559
1560async fn save_buffer(
1561    request: proto::SaveBuffer,
1562    response: Response<proto::SaveBuffer>,
1563    session: Session,
1564) -> Result<()> {
1565    let project_id = ProjectId::from_proto(request.project_id);
1566    let host_connection_id = {
1567        let collaborators = session
1568            .db()
1569            .await
1570            .project_collaborators(project_id, session.connection_id)
1571            .await?;
1572        collaborators
1573            .iter()
1574            .find(|collaborator| collaborator.is_host)
1575            .ok_or_else(|| anyhow!("host not found"))?
1576            .connection_id
1577    };
1578    let response_payload = session
1579        .peer
1580        .forward_request(session.connection_id, host_connection_id, request.clone())
1581        .await?;
1582
1583    let mut collaborators = session
1584        .db()
1585        .await
1586        .project_collaborators(project_id, session.connection_id)
1587        .await?;
1588    collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
1589    let project_connection_ids = collaborators
1590        .iter()
1591        .map(|collaborator| collaborator.connection_id);
1592    broadcast(host_connection_id, project_connection_ids, |conn_id| {
1593        session
1594            .peer
1595            .forward_send(host_connection_id, conn_id, response_payload.clone())
1596    });
1597    response.send(response_payload)?;
1598    Ok(())
1599}
1600
1601async fn create_buffer_for_peer(
1602    request: proto::CreateBufferForPeer,
1603    session: Session,
1604) -> Result<()> {
1605    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1606    session
1607        .peer
1608        .forward_send(session.connection_id, peer_id.into(), request)?;
1609    Ok(())
1610}
1611
1612async fn update_buffer(
1613    request: proto::UpdateBuffer,
1614    response: Response<proto::UpdateBuffer>,
1615    session: Session,
1616) -> Result<()> {
1617    let project_id = ProjectId::from_proto(request.project_id);
1618    let project_connection_ids = session
1619        .db()
1620        .await
1621        .project_connection_ids(project_id, session.connection_id)
1622        .await?;
1623
1624    broadcast(
1625        session.connection_id,
1626        project_connection_ids.iter().copied(),
1627        |connection_id| {
1628            session
1629                .peer
1630                .forward_send(session.connection_id, connection_id, request.clone())
1631        },
1632    );
1633    response.send(proto::Ack {})?;
1634    Ok(())
1635}
1636
1637async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1638    let project_id = ProjectId::from_proto(request.project_id);
1639    let project_connection_ids = session
1640        .db()
1641        .await
1642        .project_connection_ids(project_id, session.connection_id)
1643        .await?;
1644
1645    broadcast(
1646        session.connection_id,
1647        project_connection_ids.iter().copied(),
1648        |connection_id| {
1649            session
1650                .peer
1651                .forward_send(session.connection_id, connection_id, request.clone())
1652        },
1653    );
1654    Ok(())
1655}
1656
1657async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1658    let project_id = ProjectId::from_proto(request.project_id);
1659    let project_connection_ids = session
1660        .db()
1661        .await
1662        .project_connection_ids(project_id, session.connection_id)
1663        .await?;
1664    broadcast(
1665        session.connection_id,
1666        project_connection_ids.iter().copied(),
1667        |connection_id| {
1668            session
1669                .peer
1670                .forward_send(session.connection_id, connection_id, request.clone())
1671        },
1672    );
1673    Ok(())
1674}
1675
1676async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1677    let project_id = ProjectId::from_proto(request.project_id);
1678    let project_connection_ids = session
1679        .db()
1680        .await
1681        .project_connection_ids(project_id, session.connection_id)
1682        .await?;
1683    broadcast(
1684        session.connection_id,
1685        project_connection_ids.iter().copied(),
1686        |connection_id| {
1687            session
1688                .peer
1689                .forward_send(session.connection_id, connection_id, request.clone())
1690        },
1691    );
1692    Ok(())
1693}
1694
1695async fn follow(
1696    request: proto::Follow,
1697    response: Response<proto::Follow>,
1698    session: Session,
1699) -> Result<()> {
1700    let project_id = ProjectId::from_proto(request.project_id);
1701    let leader_id = request
1702        .leader_id
1703        .ok_or_else(|| anyhow!("invalid leader id"))?
1704        .into();
1705    let follower_id = session.connection_id;
1706    {
1707        let project_connection_ids = session
1708            .db()
1709            .await
1710            .project_connection_ids(project_id, session.connection_id)
1711            .await?;
1712
1713        if !project_connection_ids.contains(&leader_id) {
1714            Err(anyhow!("no such peer"))?;
1715        }
1716    }
1717
1718    let mut response_payload = session
1719        .peer
1720        .forward_request(session.connection_id, leader_id, request)
1721        .await?;
1722    response_payload
1723        .views
1724        .retain(|view| view.leader_id != Some(follower_id.into()));
1725    response.send(response_payload)?;
1726    Ok(())
1727}
1728
1729async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1730    let project_id = ProjectId::from_proto(request.project_id);
1731    let leader_id = request
1732        .leader_id
1733        .ok_or_else(|| anyhow!("invalid leader id"))?
1734        .into();
1735    let project_connection_ids = session
1736        .db()
1737        .await
1738        .project_connection_ids(project_id, session.connection_id)
1739        .await?;
1740    if !project_connection_ids.contains(&leader_id) {
1741        Err(anyhow!("no such peer"))?;
1742    }
1743    session
1744        .peer
1745        .forward_send(session.connection_id, leader_id, request)?;
1746    Ok(())
1747}
1748
1749async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1750    let project_id = ProjectId::from_proto(request.project_id);
1751    let project_connection_ids = session
1752        .db
1753        .lock()
1754        .await
1755        .project_connection_ids(project_id, session.connection_id)
1756        .await?;
1757
1758    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1759        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1760        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1761        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1762    });
1763    for follower_peer_id in request.follower_ids.iter().copied() {
1764        let follower_connection_id = follower_peer_id.into();
1765        if project_connection_ids.contains(&follower_connection_id)
1766            && Some(follower_peer_id) != leader_id
1767        {
1768            session.peer.forward_send(
1769                session.connection_id,
1770                follower_connection_id,
1771                request.clone(),
1772            )?;
1773        }
1774    }
1775    Ok(())
1776}
1777
1778async fn get_users(
1779    request: proto::GetUsers,
1780    response: Response<proto::GetUsers>,
1781    session: Session,
1782) -> Result<()> {
1783    let user_ids = request
1784        .user_ids
1785        .into_iter()
1786        .map(UserId::from_proto)
1787        .collect();
1788    let users = session
1789        .db()
1790        .await
1791        .get_users_by_ids(user_ids)
1792        .await?
1793        .into_iter()
1794        .map(|user| proto::User {
1795            id: user.id.to_proto(),
1796            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1797            github_login: user.github_login,
1798        })
1799        .collect();
1800    response.send(proto::UsersResponse { users })?;
1801    Ok(())
1802}
1803
1804async fn fuzzy_search_users(
1805    request: proto::FuzzySearchUsers,
1806    response: Response<proto::FuzzySearchUsers>,
1807    session: Session,
1808) -> Result<()> {
1809    let query = request.query;
1810    let users = match query.len() {
1811        0 => vec![],
1812        1 | 2 => session
1813            .db()
1814            .await
1815            .get_user_by_github_account(&query, None)
1816            .await?
1817            .into_iter()
1818            .collect(),
1819        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1820    };
1821    let users = users
1822        .into_iter()
1823        .filter(|user| user.id != session.user_id)
1824        .map(|user| proto::User {
1825            id: user.id.to_proto(),
1826            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1827            github_login: user.github_login,
1828        })
1829        .collect();
1830    response.send(proto::UsersResponse { users })?;
1831    Ok(())
1832}
1833
1834async fn request_contact(
1835    request: proto::RequestContact,
1836    response: Response<proto::RequestContact>,
1837    session: Session,
1838) -> Result<()> {
1839    let requester_id = session.user_id;
1840    let responder_id = UserId::from_proto(request.responder_id);
1841    if requester_id == responder_id {
1842        return Err(anyhow!("cannot add yourself as a contact"))?;
1843    }
1844
1845    session
1846        .db()
1847        .await
1848        .send_contact_request(requester_id, responder_id)
1849        .await?;
1850
1851    // Update outgoing contact requests of requester
1852    let mut update = proto::UpdateContacts::default();
1853    update.outgoing_requests.push(responder_id.to_proto());
1854    for connection_id in session
1855        .connection_pool()
1856        .await
1857        .user_connection_ids(requester_id)
1858    {
1859        session.peer.send(connection_id, update.clone())?;
1860    }
1861
1862    // Update incoming contact requests of responder
1863    let mut update = proto::UpdateContacts::default();
1864    update
1865        .incoming_requests
1866        .push(proto::IncomingContactRequest {
1867            requester_id: requester_id.to_proto(),
1868            should_notify: true,
1869        });
1870    for connection_id in session
1871        .connection_pool()
1872        .await
1873        .user_connection_ids(responder_id)
1874    {
1875        session.peer.send(connection_id, update.clone())?;
1876    }
1877
1878    response.send(proto::Ack {})?;
1879    Ok(())
1880}
1881
1882async fn respond_to_contact_request(
1883    request: proto::RespondToContactRequest,
1884    response: Response<proto::RespondToContactRequest>,
1885    session: Session,
1886) -> Result<()> {
1887    let responder_id = session.user_id;
1888    let requester_id = UserId::from_proto(request.requester_id);
1889    let db = session.db().await;
1890    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1891        db.dismiss_contact_notification(responder_id, requester_id)
1892            .await?;
1893    } else {
1894        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1895
1896        db.respond_to_contact_request(responder_id, requester_id, accept)
1897            .await?;
1898        let requester_busy = db.is_user_busy(requester_id).await?;
1899        let responder_busy = db.is_user_busy(responder_id).await?;
1900
1901        let pool = session.connection_pool().await;
1902        // Update responder with new contact
1903        let mut update = proto::UpdateContacts::default();
1904        if accept {
1905            update
1906                .contacts
1907                .push(contact_for_user(requester_id, false, requester_busy, &pool));
1908        }
1909        update
1910            .remove_incoming_requests
1911            .push(requester_id.to_proto());
1912        for connection_id in pool.user_connection_ids(responder_id) {
1913            session.peer.send(connection_id, update.clone())?;
1914        }
1915
1916        // Update requester with new contact
1917        let mut update = proto::UpdateContacts::default();
1918        if accept {
1919            update
1920                .contacts
1921                .push(contact_for_user(responder_id, true, responder_busy, &pool));
1922        }
1923        update
1924            .remove_outgoing_requests
1925            .push(responder_id.to_proto());
1926        for connection_id in pool.user_connection_ids(requester_id) {
1927            session.peer.send(connection_id, update.clone())?;
1928        }
1929    }
1930
1931    response.send(proto::Ack {})?;
1932    Ok(())
1933}
1934
1935async fn remove_contact(
1936    request: proto::RemoveContact,
1937    response: Response<proto::RemoveContact>,
1938    session: Session,
1939) -> Result<()> {
1940    let requester_id = session.user_id;
1941    let responder_id = UserId::from_proto(request.user_id);
1942    let db = session.db().await;
1943    db.remove_contact(requester_id, responder_id).await?;
1944
1945    let pool = session.connection_pool().await;
1946    // Update outgoing contact requests of requester
1947    let mut update = proto::UpdateContacts::default();
1948    update
1949        .remove_outgoing_requests
1950        .push(responder_id.to_proto());
1951    for connection_id in pool.user_connection_ids(requester_id) {
1952        session.peer.send(connection_id, update.clone())?;
1953    }
1954
1955    // Update incoming contact requests of responder
1956    let mut update = proto::UpdateContacts::default();
1957    update
1958        .remove_incoming_requests
1959        .push(requester_id.to_proto());
1960    for connection_id in pool.user_connection_ids(responder_id) {
1961        session.peer.send(connection_id, update.clone())?;
1962    }
1963
1964    response.send(proto::Ack {})?;
1965    Ok(())
1966}
1967
1968async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
1969    let project_id = ProjectId::from_proto(request.project_id);
1970    let project_connection_ids = session
1971        .db()
1972        .await
1973        .project_connection_ids(project_id, session.connection_id)
1974        .await?;
1975    broadcast(
1976        session.connection_id,
1977        project_connection_ids.iter().copied(),
1978        |connection_id| {
1979            session
1980                .peer
1981                .forward_send(session.connection_id, connection_id, request.clone())
1982        },
1983    );
1984    Ok(())
1985}
1986
1987async fn get_private_user_info(
1988    _request: proto::GetPrivateUserInfo,
1989    response: Response<proto::GetPrivateUserInfo>,
1990    session: Session,
1991) -> Result<()> {
1992    let metrics_id = session
1993        .db()
1994        .await
1995        .get_user_metrics_id(session.user_id)
1996        .await?;
1997    let user = session
1998        .db()
1999        .await
2000        .get_user_by_id(session.user_id)
2001        .await?
2002        .ok_or_else(|| anyhow!("user not found"))?;
2003    response.send(proto::GetPrivateUserInfoResponse {
2004        metrics_id,
2005        staff: user.admin,
2006    })?;
2007    Ok(())
2008}
2009
2010fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2011    match message {
2012        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2013        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2014        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2015        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2016        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2017            code: frame.code.into(),
2018            reason: frame.reason,
2019        })),
2020    }
2021}
2022
2023fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2024    match message {
2025        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2026        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2027        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2028        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2029        AxumMessage::Close(frame) => {
2030            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2031                code: frame.code.into(),
2032                reason: frame.reason,
2033            }))
2034        }
2035    }
2036}
2037
2038fn build_initial_contacts_update(
2039    contacts: Vec<db::Contact>,
2040    pool: &ConnectionPool,
2041) -> proto::UpdateContacts {
2042    let mut update = proto::UpdateContacts::default();
2043
2044    for contact in contacts {
2045        match contact {
2046            db::Contact::Accepted {
2047                user_id,
2048                should_notify,
2049                busy,
2050            } => {
2051                update
2052                    .contacts
2053                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2054            }
2055            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2056            db::Contact::Incoming {
2057                user_id,
2058                should_notify,
2059            } => update
2060                .incoming_requests
2061                .push(proto::IncomingContactRequest {
2062                    requester_id: user_id.to_proto(),
2063                    should_notify,
2064                }),
2065        }
2066    }
2067
2068    update
2069}
2070
2071fn contact_for_user(
2072    user_id: UserId,
2073    should_notify: bool,
2074    busy: bool,
2075    pool: &ConnectionPool,
2076) -> proto::Contact {
2077    proto::Contact {
2078        user_id: user_id.to_proto(),
2079        online: pool.is_user_online(user_id),
2080        busy,
2081        should_notify,
2082    }
2083}
2084
2085fn room_updated(room: &proto::Room, peer: &Peer) {
2086    for participant in &room.participants {
2087        if let Some(peer_id) = participant
2088            .peer_id
2089            .ok_or_else(|| anyhow!("invalid participant peer id"))
2090            .trace_err()
2091        {
2092            peer.send(
2093                peer_id.into(),
2094                proto::RoomUpdated {
2095                    room: Some(room.clone()),
2096                },
2097            )
2098            .trace_err();
2099        }
2100    }
2101}
2102
2103async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2104    let db = session.db().await;
2105    let contacts = db.get_contacts(user_id).await?;
2106    let busy = db.is_user_busy(user_id).await?;
2107
2108    let pool = session.connection_pool().await;
2109    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2110    for contact in contacts {
2111        if let db::Contact::Accepted {
2112            user_id: contact_user_id,
2113            ..
2114        } = contact
2115        {
2116            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2117                session
2118                    .peer
2119                    .send(
2120                        contact_conn_id,
2121                        proto::UpdateContacts {
2122                            contacts: vec![updated_contact.clone()],
2123                            remove_contacts: Default::default(),
2124                            incoming_requests: Default::default(),
2125                            remove_incoming_requests: Default::default(),
2126                            outgoing_requests: Default::default(),
2127                            remove_outgoing_requests: Default::default(),
2128                        },
2129                    )
2130                    .trace_err();
2131            }
2132        }
2133    }
2134    Ok(())
2135}
2136
2137async fn leave_room_for_session(session: &Session) -> Result<()> {
2138    let mut contacts_to_update = HashSet::default();
2139
2140    let room_id;
2141    let canceled_calls_to_user_ids;
2142    let live_kit_room;
2143    let delete_live_kit_room;
2144    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2145        contacts_to_update.insert(session.user_id);
2146
2147        for project in left_room.left_projects.values() {
2148            project_left(project, session);
2149        }
2150
2151        room_updated(&left_room.room, &session.peer);
2152        room_id = RoomId::from_proto(left_room.room.id);
2153        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2154        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2155        delete_live_kit_room = left_room.room.participants.is_empty();
2156    } else {
2157        return Ok(());
2158    }
2159
2160    {
2161        let pool = session.connection_pool().await;
2162        for canceled_user_id in canceled_calls_to_user_ids {
2163            for connection_id in pool.user_connection_ids(canceled_user_id) {
2164                session
2165                    .peer
2166                    .send(
2167                        connection_id,
2168                        proto::CallCanceled {
2169                            room_id: room_id.to_proto(),
2170                        },
2171                    )
2172                    .trace_err();
2173            }
2174            contacts_to_update.insert(canceled_user_id);
2175        }
2176    }
2177
2178    for contact_user_id in contacts_to_update {
2179        update_user_contacts(contact_user_id, &session).await?;
2180    }
2181
2182    if let Some(live_kit) = session.live_kit_client.as_ref() {
2183        live_kit
2184            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2185            .await
2186            .trace_err();
2187
2188        if delete_live_kit_room {
2189            live_kit.delete_room(live_kit_room).await.trace_err();
2190        }
2191    }
2192
2193    Ok(())
2194}
2195
2196fn project_left(project: &db::LeftProject, session: &Session) {
2197    for connection_id in &project.connection_ids {
2198        if project.host_user_id == session.user_id {
2199            session
2200                .peer
2201                .send(
2202                    *connection_id,
2203                    proto::UnshareProject {
2204                        project_id: project.id.to_proto(),
2205                    },
2206                )
2207                .trace_err();
2208        } else {
2209            session
2210                .peer
2211                .send(
2212                    *connection_id,
2213                    proto::RemoveProjectCollaborator {
2214                        project_id: project.id.to_proto(),
2215                        peer_id: Some(session.connection_id.into()),
2216                    },
2217                )
2218                .trace_err();
2219        }
2220    }
2221
2222    session
2223        .peer
2224        .send(
2225            session.connection_id,
2226            proto::UnshareProject {
2227                project_id: project.id.to_proto(),
2228            },
2229        )
2230        .trace_err();
2231}
2232
2233pub trait ResultExt {
2234    type Ok;
2235
2236    fn trace_err(self) -> Option<Self::Ok>;
2237}
2238
2239impl<T, E> ResultExt for Result<T, E>
2240where
2241    E: std::fmt::Debug,
2242{
2243    type Ok = T;
2244
2245    fn trace_err(self) -> Option<T> {
2246        match self {
2247            Ok(value) => Some(value),
2248            Err(error) => {
2249                tracing::error!("{:?}", error);
2250                None
2251            }
2252        }
2253    }
2254}