rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::Duration,
  55};
  56use tokio::sync::watch;
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_message_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_request_handler(forward_project_request::<proto::GetHover>)
 204            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 205            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 206            .add_request_handler(forward_project_request::<proto::GetReferences>)
 207            .add_request_handler(forward_project_request::<proto::SearchProject>)
 208            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 209            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 210            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 213            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 214            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 215            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 216            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 217            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 218            .add_request_handler(forward_project_request::<proto::PerformRename>)
 219            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 220            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 221            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 222            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 223            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 224            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 226            .add_message_handler(create_buffer_for_peer)
 227            .add_request_handler(update_buffer)
 228            .add_message_handler(update_buffer_file)
 229            .add_message_handler(buffer_reloaded)
 230            .add_message_handler(buffer_saved)
 231            .add_request_handler(save_buffer)
 232            .add_request_handler(get_users)
 233            .add_request_handler(fuzzy_search_users)
 234            .add_request_handler(request_contact)
 235            .add_request_handler(remove_contact)
 236            .add_request_handler(respond_to_contact_request)
 237            .add_request_handler(follow)
 238            .add_message_handler(unfollow)
 239            .add_message_handler(update_followers)
 240            .add_message_handler(update_diff_base)
 241            .add_request_handler(get_private_user_info);
 242
 243        Arc::new(server)
 244    }
 245
 246    pub async fn start(&self) -> Result<()> {
 247        let server_id = *self.id.lock();
 248        let app_state = self.app_state.clone();
 249        let peer = self.peer.clone();
 250        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 251        let pool = self.connection_pool.clone();
 252        let live_kit_client = self.app_state.live_kit_client.clone();
 253
 254        let span = info_span!("start server");
 255        self.executor.spawn_detached(
 256            async move {
 257                tracing::info!("waiting for cleanup timeout");
 258                timeout.await;
 259                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 260                if let Some(room_ids) = app_state
 261                    .db
 262                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 263                    .await
 264                    .trace_err()
 265                {
 266                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 267                    for room_id in room_ids {
 268                        let mut contacts_to_update = HashSet::default();
 269                        let mut canceled_calls_to_user_ids = Vec::new();
 270                        let mut live_kit_room = String::new();
 271                        let mut delete_live_kit_room = false;
 272
 273                        if let Some(mut refreshed_room) = app_state
 274                            .db
 275                            .refresh_room(room_id, server_id)
 276                            .await
 277                            .trace_err()
 278                        {
 279                            tracing::info!(
 280                                room_id = room_id.0,
 281                                new_participant_count = refreshed_room.room.participants.len(),
 282                                "refreshed room"
 283                            );
 284                            room_updated(&refreshed_room.room, &peer);
 285                            contacts_to_update
 286                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 287                            contacts_to_update
 288                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 289                            canceled_calls_to_user_ids =
 290                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 291                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 292                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 293                        }
 294
 295                        {
 296                            let pool = pool.lock();
 297                            for canceled_user_id in canceled_calls_to_user_ids {
 298                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 299                                    peer.send(
 300                                        connection_id,
 301                                        proto::CallCanceled {
 302                                            room_id: room_id.to_proto(),
 303                                        },
 304                                    )
 305                                    .trace_err();
 306                                }
 307                            }
 308                        }
 309
 310                        for user_id in contacts_to_update {
 311                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 312                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 313                            if let Some((busy, contacts)) = busy.zip(contacts) {
 314                                let pool = pool.lock();
 315                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 316                                for contact in contacts {
 317                                    if let db::Contact::Accepted {
 318                                        user_id: contact_user_id,
 319                                        ..
 320                                    } = contact
 321                                    {
 322                                        for contact_conn_id in
 323                                            pool.user_connection_ids(contact_user_id)
 324                                        {
 325                                            peer.send(
 326                                                contact_conn_id,
 327                                                proto::UpdateContacts {
 328                                                    contacts: vec![updated_contact.clone()],
 329                                                    remove_contacts: Default::default(),
 330                                                    incoming_requests: Default::default(),
 331                                                    remove_incoming_requests: Default::default(),
 332                                                    outgoing_requests: Default::default(),
 333                                                    remove_outgoing_requests: Default::default(),
 334                                                },
 335                                            )
 336                                            .trace_err();
 337                                        }
 338                                    }
 339                                }
 340                            }
 341                        }
 342
 343                        if let Some(live_kit) = live_kit_client.as_ref() {
 344                            if delete_live_kit_room {
 345                                live_kit.delete_room(live_kit_room).await.trace_err();
 346                            }
 347                        }
 348                    }
 349                }
 350
 351                app_state
 352                    .db
 353                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 354                    .await
 355                    .trace_err();
 356            }
 357            .instrument(span),
 358        );
 359        Ok(())
 360    }
 361
 362    pub fn teardown(&self) {
 363        self.peer.teardown();
 364        self.connection_pool.lock().reset();
 365        let _ = self.teardown.send(());
 366    }
 367
 368    #[cfg(test)]
 369    pub fn reset(&self, id: ServerId) {
 370        self.teardown();
 371        *self.id.lock() = id;
 372        self.peer.reset(id.0 as u32);
 373    }
 374
 375    #[cfg(test)]
 376    pub fn id(&self) -> ServerId {
 377        *self.id.lock()
 378    }
 379
 380    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 381    where
 382        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 383        Fut: 'static + Send + Future<Output = Result<()>>,
 384        M: EnvelopedMessage,
 385    {
 386        let prev_handler = self.handlers.insert(
 387            TypeId::of::<M>(),
 388            Box::new(move |envelope, session| {
 389                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 390                let span = info_span!(
 391                    "handle message",
 392                    payload_type = envelope.payload_type_name()
 393                );
 394                span.in_scope(|| {
 395                    tracing::info!(
 396                        payload_type = envelope.payload_type_name(),
 397                        "message received"
 398                    );
 399                });
 400                let future = (handler)(*envelope, session);
 401                async move {
 402                    if let Err(error) = future.await {
 403                        tracing::error!(%error, "error handling message");
 404                    }
 405                }
 406                .instrument(span)
 407                .boxed()
 408            }),
 409        );
 410        if prev_handler.is_some() {
 411            panic!("registered a handler for the same message twice");
 412        }
 413        self
 414    }
 415
 416    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 417    where
 418        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 419        Fut: 'static + Send + Future<Output = Result<()>>,
 420        M: EnvelopedMessage,
 421    {
 422        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 423        self
 424    }
 425
 426    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 427    where
 428        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 429        Fut: Send + Future<Output = Result<()>>,
 430        M: RequestMessage,
 431    {
 432        let handler = Arc::new(handler);
 433        self.add_handler(move |envelope, session| {
 434            let receipt = envelope.receipt();
 435            let handler = handler.clone();
 436            async move {
 437                let peer = session.peer.clone();
 438                let responded = Arc::new(AtomicBool::default());
 439                let response = Response {
 440                    peer: peer.clone(),
 441                    responded: responded.clone(),
 442                    receipt,
 443                };
 444                match (handler)(envelope.payload, response, session).await {
 445                    Ok(()) => {
 446                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 447                            Ok(())
 448                        } else {
 449                            Err(anyhow!("handler did not send a response"))?
 450                        }
 451                    }
 452                    Err(error) => {
 453                        peer.respond_with_error(
 454                            receipt,
 455                            proto::Error {
 456                                message: error.to_string(),
 457                            },
 458                        )?;
 459                        Err(error)
 460                    }
 461                }
 462            }
 463        })
 464    }
 465
 466    pub fn handle_connection(
 467        self: &Arc<Self>,
 468        connection: Connection,
 469        address: String,
 470        user: User,
 471        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 472        executor: Executor,
 473    ) -> impl Future<Output = Result<()>> {
 474        let this = self.clone();
 475        let user_id = user.id;
 476        let login = user.github_login;
 477        let span = info_span!("handle connection", %user_id, %login, %address);
 478        let mut teardown = self.teardown.subscribe();
 479        async move {
 480            let (connection_id, handle_io, mut incoming_rx) = this
 481                .peer
 482                .add_connection(connection, {
 483                    let executor = executor.clone();
 484                    move |duration| executor.sleep(duration)
 485                });
 486
 487            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 488            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 489            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 490
 491            if let Some(send_connection_id) = send_connection_id.take() {
 492                let _ = send_connection_id.send(connection_id);
 493            }
 494
 495            if !user.connected_once {
 496                this.peer.send(connection_id, proto::ShowContacts {})?;
 497                this.app_state.db.set_user_connected_once(user_id, true).await?;
 498            }
 499
 500            let (contacts, invite_code) = future::try_join(
 501                this.app_state.db.get_contacts(user_id),
 502                this.app_state.db.get_invite_code_for_user(user_id)
 503            ).await?;
 504
 505            {
 506                let mut pool = this.connection_pool.lock();
 507                pool.add_connection(connection_id, user_id, user.admin);
 508                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 509
 510                if let Some((code, count)) = invite_code {
 511                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 512                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 513                        count: count as u32,
 514                    })?;
 515                }
 516            }
 517
 518            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 519                this.peer.send(connection_id, incoming_call)?;
 520            }
 521
 522            let session = Session {
 523                user_id,
 524                connection_id,
 525                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 526                peer: this.peer.clone(),
 527                connection_pool: this.connection_pool.clone(),
 528                live_kit_client: this.app_state.live_kit_client.clone(),
 529                executor: executor.clone(),
 530            };
 531            update_user_contacts(user_id, &session).await?;
 532
 533            let handle_io = handle_io.fuse();
 534            futures::pin_mut!(handle_io);
 535
 536            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 537            // This prevents deadlocks when e.g., client A performs a request to client B and
 538            // client B performs a request to client A. If both clients stop processing further
 539            // messages until their respective request completes, they won't have a chance to
 540            // respond to the other client's request and cause a deadlock.
 541            //
 542            // This arrangement ensures we will attempt to process earlier messages first, but fall
 543            // back to processing messages arrived later in the spirit of making progress.
 544            let mut foreground_message_handlers = FuturesUnordered::new();
 545            loop {
 546                let next_message = incoming_rx.next().fuse();
 547                futures::pin_mut!(next_message);
 548                futures::select_biased! {
 549                    _ = teardown.changed().fuse() => return Ok(()),
 550                    result = handle_io => {
 551                        if let Err(error) = result {
 552                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 553                        }
 554                        break;
 555                    }
 556                    _ = foreground_message_handlers.next() => {}
 557                    message = next_message => {
 558                        if let Some(message) = message {
 559                            let type_name = message.payload_type_name();
 560                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 561                            let span_enter = span.enter();
 562                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 563                                let is_background = message.is_background();
 564                                let handle_message = (handler)(message, session.clone());
 565                                drop(span_enter);
 566
 567                                let handle_message = handle_message.instrument(span);
 568                                if is_background {
 569                                    executor.spawn_detached(handle_message);
 570                                } else {
 571                                    foreground_message_handlers.push(handle_message);
 572                                }
 573                            } else {
 574                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 575                            }
 576                        } else {
 577                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 578                            break;
 579                        }
 580                    }
 581                }
 582            }
 583
 584            drop(foreground_message_handlers);
 585            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 586            if let Err(error) = connection_lost(session, teardown, executor).await {
 587                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 588            }
 589
 590            Ok(())
 591        }.instrument(span)
 592    }
 593
 594    pub async fn invite_code_redeemed(
 595        self: &Arc<Self>,
 596        inviter_id: UserId,
 597        invitee_id: UserId,
 598    ) -> Result<()> {
 599        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 600            if let Some(code) = &user.invite_code {
 601                let pool = self.connection_pool.lock();
 602                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 603                for connection_id in pool.user_connection_ids(inviter_id) {
 604                    self.peer.send(
 605                        connection_id,
 606                        proto::UpdateContacts {
 607                            contacts: vec![invitee_contact.clone()],
 608                            ..Default::default()
 609                        },
 610                    )?;
 611                    self.peer.send(
 612                        connection_id,
 613                        proto::UpdateInviteInfo {
 614                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 615                            count: user.invite_count as u32,
 616                        },
 617                    )?;
 618                }
 619            }
 620        }
 621        Ok(())
 622    }
 623
 624    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 625        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 626            if let Some(invite_code) = &user.invite_code {
 627                let pool = self.connection_pool.lock();
 628                for connection_id in pool.user_connection_ids(user_id) {
 629                    self.peer.send(
 630                        connection_id,
 631                        proto::UpdateInviteInfo {
 632                            url: format!(
 633                                "{}{}",
 634                                self.app_state.config.invite_link_prefix, invite_code
 635                            ),
 636                            count: user.invite_count as u32,
 637                        },
 638                    )?;
 639                }
 640            }
 641        }
 642        Ok(())
 643    }
 644
 645    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 646        ServerSnapshot {
 647            connection_pool: ConnectionPoolGuard {
 648                guard: self.connection_pool.lock(),
 649                _not_send: PhantomData,
 650            },
 651            peer: &self.peer,
 652        }
 653    }
 654}
 655
 656impl<'a> Deref for ConnectionPoolGuard<'a> {
 657    type Target = ConnectionPool;
 658
 659    fn deref(&self) -> &Self::Target {
 660        &*self.guard
 661    }
 662}
 663
 664impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 665    fn deref_mut(&mut self) -> &mut Self::Target {
 666        &mut *self.guard
 667    }
 668}
 669
 670impl<'a> Drop for ConnectionPoolGuard<'a> {
 671    fn drop(&mut self) {
 672        #[cfg(test)]
 673        self.check_invariants();
 674    }
 675}
 676
 677fn broadcast<F>(
 678    sender_id: Option<ConnectionId>,
 679    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 680    mut f: F,
 681) where
 682    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 683{
 684    for receiver_id in receiver_ids {
 685        if Some(receiver_id) != sender_id {
 686            if let Err(error) = f(receiver_id) {
 687                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 688            }
 689        }
 690    }
 691}
 692
 693lazy_static! {
 694    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 695}
 696
 697pub struct ProtocolVersion(u32);
 698
 699impl Header for ProtocolVersion {
 700    fn name() -> &'static HeaderName {
 701        &ZED_PROTOCOL_VERSION
 702    }
 703
 704    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 705    where
 706        Self: Sized,
 707        I: Iterator<Item = &'i axum::http::HeaderValue>,
 708    {
 709        let version = values
 710            .next()
 711            .ok_or_else(axum::headers::Error::invalid)?
 712            .to_str()
 713            .map_err(|_| axum::headers::Error::invalid())?
 714            .parse()
 715            .map_err(|_| axum::headers::Error::invalid())?;
 716        Ok(Self(version))
 717    }
 718
 719    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 720        values.extend([self.0.to_string().parse().unwrap()]);
 721    }
 722}
 723
 724pub fn routes(server: Arc<Server>) -> Router<Body> {
 725    Router::new()
 726        .route("/rpc", get(handle_websocket_request))
 727        .layer(
 728            ServiceBuilder::new()
 729                .layer(Extension(server.app_state.clone()))
 730                .layer(middleware::from_fn(auth::validate_header)),
 731        )
 732        .route("/metrics", get(handle_metrics))
 733        .layer(Extension(server))
 734}
 735
 736pub async fn handle_websocket_request(
 737    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 738    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 739    Extension(server): Extension<Arc<Server>>,
 740    Extension(user): Extension<User>,
 741    ws: WebSocketUpgrade,
 742) -> axum::response::Response {
 743    if protocol_version != rpc::PROTOCOL_VERSION {
 744        return (
 745            StatusCode::UPGRADE_REQUIRED,
 746            "client must be upgraded".to_string(),
 747        )
 748            .into_response();
 749    }
 750    let socket_address = socket_address.to_string();
 751    ws.on_upgrade(move |socket| {
 752        use util::ResultExt;
 753        let socket = socket
 754            .map_ok(to_tungstenite_message)
 755            .err_into()
 756            .with(|message| async move { Ok(to_axum_message(message)) });
 757        let connection = Connection::new(Box::pin(socket));
 758        async move {
 759            server
 760                .handle_connection(connection, socket_address, user, None, Executor::Production)
 761                .await
 762                .log_err();
 763        }
 764    })
 765}
 766
 767pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 768    let connections = server
 769        .connection_pool
 770        .lock()
 771        .connections()
 772        .filter(|connection| !connection.admin)
 773        .count();
 774
 775    METRIC_CONNECTIONS.set(connections as _);
 776
 777    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 778    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 779
 780    let encoder = prometheus::TextEncoder::new();
 781    let metric_families = prometheus::gather();
 782    let encoded_metrics = encoder
 783        .encode_to_string(&metric_families)
 784        .map_err(|err| anyhow!("{}", err))?;
 785    Ok(encoded_metrics)
 786}
 787
 788#[instrument(err, skip(executor))]
 789async fn connection_lost(
 790    session: Session,
 791    mut teardown: watch::Receiver<()>,
 792    executor: Executor,
 793) -> Result<()> {
 794    session.peer.disconnect(session.connection_id);
 795    session
 796        .connection_pool()
 797        .await
 798        .remove_connection(session.connection_id)?;
 799
 800    session
 801        .db()
 802        .await
 803        .connection_lost(session.connection_id)
 804        .await
 805        .trace_err();
 806
 807    futures::select_biased! {
 808        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 809            leave_room_for_session(&session).await.trace_err();
 810
 811            if !session
 812                .connection_pool()
 813                .await
 814                .is_user_online(session.user_id)
 815            {
 816                let db = session.db().await;
 817                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 818                    room_updated(&room, &session.peer);
 819                }
 820            }
 821            update_user_contacts(session.user_id, &session).await?;
 822        }
 823        _ = teardown.changed().fuse() => {}
 824    }
 825
 826    Ok(())
 827}
 828
 829async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 830    response.send(proto::Ack {})?;
 831    Ok(())
 832}
 833
 834async fn create_room(
 835    _request: proto::CreateRoom,
 836    response: Response<proto::CreateRoom>,
 837    session: Session,
 838) -> Result<()> {
 839    let live_kit_room = nanoid::nanoid!(30);
 840    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 841        if let Some(_) = live_kit
 842            .create_room(live_kit_room.clone())
 843            .await
 844            .trace_err()
 845        {
 846            if let Some(token) = live_kit
 847                .room_token(&live_kit_room, &session.user_id.to_string())
 848                .trace_err()
 849            {
 850                Some(proto::LiveKitConnectionInfo {
 851                    server_url: live_kit.url().into(),
 852                    token,
 853                })
 854            } else {
 855                None
 856            }
 857        } else {
 858            None
 859        }
 860    } else {
 861        None
 862    };
 863
 864    {
 865        let room = session
 866            .db()
 867            .await
 868            .create_room(session.user_id, session.connection_id, &live_kit_room)
 869            .await?;
 870
 871        response.send(proto::CreateRoomResponse {
 872            room: Some(room.clone()),
 873            live_kit_connection_info,
 874        })?;
 875    }
 876
 877    update_user_contacts(session.user_id, &session).await?;
 878    Ok(())
 879}
 880
 881async fn join_room(
 882    request: proto::JoinRoom,
 883    response: Response<proto::JoinRoom>,
 884    session: Session,
 885) -> Result<()> {
 886    let room_id = RoomId::from_proto(request.id);
 887    let room = {
 888        let room = session
 889            .db()
 890            .await
 891            .join_room(room_id, session.user_id, session.connection_id)
 892            .await?;
 893        room_updated(&room, &session.peer);
 894        room.clone()
 895    };
 896
 897    for connection_id in session
 898        .connection_pool()
 899        .await
 900        .user_connection_ids(session.user_id)
 901    {
 902        session
 903            .peer
 904            .send(
 905                connection_id,
 906                proto::CallCanceled {
 907                    room_id: room_id.to_proto(),
 908                },
 909            )
 910            .trace_err();
 911    }
 912
 913    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 914        if let Some(token) = live_kit
 915            .room_token(&room.live_kit_room, &session.user_id.to_string())
 916            .trace_err()
 917        {
 918            Some(proto::LiveKitConnectionInfo {
 919                server_url: live_kit.url().into(),
 920                token,
 921            })
 922        } else {
 923            None
 924        }
 925    } else {
 926        None
 927    };
 928
 929    response.send(proto::JoinRoomResponse {
 930        room: Some(room),
 931        live_kit_connection_info,
 932    })?;
 933
 934    update_user_contacts(session.user_id, &session).await?;
 935    Ok(())
 936}
 937
 938async fn rejoin_room(
 939    request: proto::RejoinRoom,
 940    response: Response<proto::RejoinRoom>,
 941    session: Session,
 942) -> Result<()> {
 943    {
 944        let mut rejoined_room = session
 945            .db()
 946            .await
 947            .rejoin_room(request, session.user_id, session.connection_id)
 948            .await?;
 949
 950        response.send(proto::RejoinRoomResponse {
 951            room: Some(rejoined_room.room.clone()),
 952            reshared_projects: rejoined_room
 953                .reshared_projects
 954                .iter()
 955                .map(|project| proto::ResharedProject {
 956                    id: project.id.to_proto(),
 957                    collaborators: project
 958                        .collaborators
 959                        .iter()
 960                        .map(|collaborator| collaborator.to_proto())
 961                        .collect(),
 962                })
 963                .collect(),
 964            rejoined_projects: rejoined_room
 965                .rejoined_projects
 966                .iter()
 967                .map(|rejoined_project| proto::RejoinedProject {
 968                    id: rejoined_project.id.to_proto(),
 969                    worktrees: rejoined_project
 970                        .worktrees
 971                        .iter()
 972                        .map(|worktree| proto::WorktreeMetadata {
 973                            id: worktree.id,
 974                            root_name: worktree.root_name.clone(),
 975                            visible: worktree.visible,
 976                            abs_path: worktree.abs_path.clone(),
 977                        })
 978                        .collect(),
 979                    collaborators: rejoined_project
 980                        .collaborators
 981                        .iter()
 982                        .map(|collaborator| collaborator.to_proto())
 983                        .collect(),
 984                    language_servers: rejoined_project.language_servers.clone(),
 985                })
 986                .collect(),
 987        })?;
 988        room_updated(&rejoined_room.room, &session.peer);
 989
 990        for project in &rejoined_room.reshared_projects {
 991            for collaborator in &project.collaborators {
 992                session
 993                    .peer
 994                    .send(
 995                        collaborator.connection_id,
 996                        proto::UpdateProjectCollaborator {
 997                            project_id: project.id.to_proto(),
 998                            old_peer_id: Some(project.old_connection_id.into()),
 999                            new_peer_id: Some(session.connection_id.into()),
1000                        },
1001                    )
1002                    .trace_err();
1003            }
1004
1005            broadcast(
1006                Some(session.connection_id),
1007                project
1008                    .collaborators
1009                    .iter()
1010                    .map(|collaborator| collaborator.connection_id),
1011                |connection_id| {
1012                    session.peer.forward_send(
1013                        session.connection_id,
1014                        connection_id,
1015                        proto::UpdateProject {
1016                            project_id: project.id.to_proto(),
1017                            worktrees: project.worktrees.clone(),
1018                        },
1019                    )
1020                },
1021            );
1022        }
1023
1024        for project in &rejoined_room.rejoined_projects {
1025            for collaborator in &project.collaborators {
1026                session
1027                    .peer
1028                    .send(
1029                        collaborator.connection_id,
1030                        proto::UpdateProjectCollaborator {
1031                            project_id: project.id.to_proto(),
1032                            old_peer_id: Some(project.old_connection_id.into()),
1033                            new_peer_id: Some(session.connection_id.into()),
1034                        },
1035                    )
1036                    .trace_err();
1037            }
1038        }
1039
1040        for project in &mut rejoined_room.rejoined_projects {
1041            for worktree in mem::take(&mut project.worktrees) {
1042                #[cfg(any(test, feature = "test-support"))]
1043                const MAX_CHUNK_SIZE: usize = 2;
1044                #[cfg(not(any(test, feature = "test-support")))]
1045                const MAX_CHUNK_SIZE: usize = 256;
1046
1047                // Stream this worktree's entries.
1048                let message = proto::UpdateWorktree {
1049                    project_id: project.id.to_proto(),
1050                    worktree_id: worktree.id,
1051                    abs_path: worktree.abs_path.clone(),
1052                    root_name: worktree.root_name,
1053                    updated_entries: worktree.updated_entries,
1054                    removed_entries: worktree.removed_entries,
1055                    scan_id: worktree.scan_id,
1056                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1057                };
1058                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1059                    session.peer.send(session.connection_id, update.clone())?;
1060                }
1061
1062                // Stream this worktree's diagnostics.
1063                for summary in worktree.diagnostic_summaries {
1064                    session.peer.send(
1065                        session.connection_id,
1066                        proto::UpdateDiagnosticSummary {
1067                            project_id: project.id.to_proto(),
1068                            worktree_id: worktree.id,
1069                            summary: Some(summary),
1070                        },
1071                    )?;
1072                }
1073            }
1074
1075            for language_server in &project.language_servers {
1076                session.peer.send(
1077                    session.connection_id,
1078                    proto::UpdateLanguageServer {
1079                        project_id: project.id.to_proto(),
1080                        language_server_id: language_server.id,
1081                        variant: Some(
1082                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1083                                proto::LspDiskBasedDiagnosticsUpdated {},
1084                            ),
1085                        ),
1086                    },
1087                )?;
1088            }
1089        }
1090    }
1091
1092    update_user_contacts(session.user_id, &session).await?;
1093    Ok(())
1094}
1095
1096async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
1097    leave_room_for_session(&session).await
1098}
1099
1100async fn call(
1101    request: proto::Call,
1102    response: Response<proto::Call>,
1103    session: Session,
1104) -> Result<()> {
1105    let room_id = RoomId::from_proto(request.room_id);
1106    let calling_user_id = session.user_id;
1107    let calling_connection_id = session.connection_id;
1108    let called_user_id = UserId::from_proto(request.called_user_id);
1109    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1110    if !session
1111        .db()
1112        .await
1113        .has_contact(calling_user_id, called_user_id)
1114        .await?
1115    {
1116        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1117    }
1118
1119    let incoming_call = {
1120        let (room, incoming_call) = &mut *session
1121            .db()
1122            .await
1123            .call(
1124                room_id,
1125                calling_user_id,
1126                calling_connection_id,
1127                called_user_id,
1128                initial_project_id,
1129            )
1130            .await?;
1131        room_updated(&room, &session.peer);
1132        mem::take(incoming_call)
1133    };
1134    update_user_contacts(called_user_id, &session).await?;
1135
1136    let mut calls = session
1137        .connection_pool()
1138        .await
1139        .user_connection_ids(called_user_id)
1140        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1141        .collect::<FuturesUnordered<_>>();
1142
1143    while let Some(call_response) = calls.next().await {
1144        match call_response.as_ref() {
1145            Ok(_) => {
1146                response.send(proto::Ack {})?;
1147                return Ok(());
1148            }
1149            Err(_) => {
1150                call_response.trace_err();
1151            }
1152        }
1153    }
1154
1155    {
1156        let room = session
1157            .db()
1158            .await
1159            .call_failed(room_id, called_user_id)
1160            .await?;
1161        room_updated(&room, &session.peer);
1162    }
1163    update_user_contacts(called_user_id, &session).await?;
1164
1165    Err(anyhow!("failed to ring user"))?
1166}
1167
1168async fn cancel_call(
1169    request: proto::CancelCall,
1170    response: Response<proto::CancelCall>,
1171    session: Session,
1172) -> Result<()> {
1173    let called_user_id = UserId::from_proto(request.called_user_id);
1174    let room_id = RoomId::from_proto(request.room_id);
1175    {
1176        let room = session
1177            .db()
1178            .await
1179            .cancel_call(room_id, session.connection_id, called_user_id)
1180            .await?;
1181        room_updated(&room, &session.peer);
1182    }
1183
1184    for connection_id in session
1185        .connection_pool()
1186        .await
1187        .user_connection_ids(called_user_id)
1188    {
1189        session
1190            .peer
1191            .send(
1192                connection_id,
1193                proto::CallCanceled {
1194                    room_id: room_id.to_proto(),
1195                },
1196            )
1197            .trace_err();
1198    }
1199    response.send(proto::Ack {})?;
1200
1201    update_user_contacts(called_user_id, &session).await?;
1202    Ok(())
1203}
1204
1205async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1206    let room_id = RoomId::from_proto(message.room_id);
1207    {
1208        let room = session
1209            .db()
1210            .await
1211            .decline_call(Some(room_id), session.user_id)
1212            .await?
1213            .ok_or_else(|| anyhow!("failed to decline call"))?;
1214        room_updated(&room, &session.peer);
1215    }
1216
1217    for connection_id in session
1218        .connection_pool()
1219        .await
1220        .user_connection_ids(session.user_id)
1221    {
1222        session
1223            .peer
1224            .send(
1225                connection_id,
1226                proto::CallCanceled {
1227                    room_id: room_id.to_proto(),
1228                },
1229            )
1230            .trace_err();
1231    }
1232    update_user_contacts(session.user_id, &session).await?;
1233    Ok(())
1234}
1235
1236async fn update_participant_location(
1237    request: proto::UpdateParticipantLocation,
1238    response: Response<proto::UpdateParticipantLocation>,
1239    session: Session,
1240) -> Result<()> {
1241    let room_id = RoomId::from_proto(request.room_id);
1242    let location = request
1243        .location
1244        .ok_or_else(|| anyhow!("invalid location"))?;
1245    let room = session
1246        .db()
1247        .await
1248        .update_room_participant_location(room_id, session.connection_id, location)
1249        .await?;
1250    room_updated(&room, &session.peer);
1251    response.send(proto::Ack {})?;
1252    Ok(())
1253}
1254
1255async fn share_project(
1256    request: proto::ShareProject,
1257    response: Response<proto::ShareProject>,
1258    session: Session,
1259) -> Result<()> {
1260    let (project_id, room) = &*session
1261        .db()
1262        .await
1263        .share_project(
1264            RoomId::from_proto(request.room_id),
1265            session.connection_id,
1266            &request.worktrees,
1267        )
1268        .await?;
1269    response.send(proto::ShareProjectResponse {
1270        project_id: project_id.to_proto(),
1271    })?;
1272    room_updated(&room, &session.peer);
1273
1274    Ok(())
1275}
1276
1277async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1278    let project_id = ProjectId::from_proto(message.project_id);
1279
1280    let (room, guest_connection_ids) = &*session
1281        .db()
1282        .await
1283        .unshare_project(project_id, session.connection_id)
1284        .await?;
1285
1286    broadcast(
1287        Some(session.connection_id),
1288        guest_connection_ids.iter().copied(),
1289        |conn_id| session.peer.send(conn_id, message.clone()),
1290    );
1291    room_updated(&room, &session.peer);
1292
1293    Ok(())
1294}
1295
1296async fn join_project(
1297    request: proto::JoinProject,
1298    response: Response<proto::JoinProject>,
1299    session: Session,
1300) -> Result<()> {
1301    let project_id = ProjectId::from_proto(request.project_id);
1302    let guest_user_id = session.user_id;
1303
1304    tracing::info!(%project_id, "join project");
1305
1306    let (project, replica_id) = &mut *session
1307        .db()
1308        .await
1309        .join_project(project_id, session.connection_id)
1310        .await?;
1311
1312    let collaborators = project
1313        .collaborators
1314        .iter()
1315        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1316        .map(|collaborator| collaborator.to_proto())
1317        .collect::<Vec<_>>();
1318
1319    let worktrees = project
1320        .worktrees
1321        .iter()
1322        .map(|(id, worktree)| proto::WorktreeMetadata {
1323            id: *id,
1324            root_name: worktree.root_name.clone(),
1325            visible: worktree.visible,
1326            abs_path: worktree.abs_path.clone(),
1327        })
1328        .collect::<Vec<_>>();
1329
1330    for collaborator in &collaborators {
1331        session
1332            .peer
1333            .send(
1334                collaborator.peer_id.unwrap().into(),
1335                proto::AddProjectCollaborator {
1336                    project_id: project_id.to_proto(),
1337                    collaborator: Some(proto::Collaborator {
1338                        peer_id: Some(session.connection_id.into()),
1339                        replica_id: replica_id.0 as u32,
1340                        user_id: guest_user_id.to_proto(),
1341                    }),
1342                },
1343            )
1344            .trace_err();
1345    }
1346
1347    // First, we send the metadata associated with each worktree.
1348    response.send(proto::JoinProjectResponse {
1349        worktrees: worktrees.clone(),
1350        replica_id: replica_id.0 as u32,
1351        collaborators: collaborators.clone(),
1352        language_servers: project.language_servers.clone(),
1353    })?;
1354
1355    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1356        #[cfg(any(test, feature = "test-support"))]
1357        const MAX_CHUNK_SIZE: usize = 2;
1358        #[cfg(not(any(test, feature = "test-support")))]
1359        const MAX_CHUNK_SIZE: usize = 256;
1360
1361        // Stream this worktree's entries.
1362        let message = proto::UpdateWorktree {
1363            project_id: project_id.to_proto(),
1364            worktree_id,
1365            abs_path: worktree.abs_path.clone(),
1366            root_name: worktree.root_name,
1367            updated_entries: worktree.entries,
1368            removed_entries: Default::default(),
1369            scan_id: worktree.scan_id,
1370            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1371        };
1372        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1373            session.peer.send(session.connection_id, update.clone())?;
1374        }
1375
1376        // Stream this worktree's diagnostics.
1377        for summary in worktree.diagnostic_summaries {
1378            session.peer.send(
1379                session.connection_id,
1380                proto::UpdateDiagnosticSummary {
1381                    project_id: project_id.to_proto(),
1382                    worktree_id: worktree.id,
1383                    summary: Some(summary),
1384                },
1385            )?;
1386        }
1387    }
1388
1389    for language_server in &project.language_servers {
1390        session.peer.send(
1391            session.connection_id,
1392            proto::UpdateLanguageServer {
1393                project_id: project_id.to_proto(),
1394                language_server_id: language_server.id,
1395                variant: Some(
1396                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1397                        proto::LspDiskBasedDiagnosticsUpdated {},
1398                    ),
1399                ),
1400            },
1401        )?;
1402    }
1403
1404    Ok(())
1405}
1406
1407async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1408    let sender_id = session.connection_id;
1409    let project_id = ProjectId::from_proto(request.project_id);
1410
1411    let project = session
1412        .db()
1413        .await
1414        .leave_project(project_id, sender_id)
1415        .await?;
1416    tracing::info!(
1417        %project_id,
1418        host_user_id = %project.host_user_id,
1419        host_connection_id = %project.host_connection_id,
1420        "leave project"
1421    );
1422    project_left(&project, &session);
1423
1424    Ok(())
1425}
1426
1427async fn update_project(
1428    request: proto::UpdateProject,
1429    response: Response<proto::UpdateProject>,
1430    session: Session,
1431) -> Result<()> {
1432    let project_id = ProjectId::from_proto(request.project_id);
1433    let (room, guest_connection_ids) = &*session
1434        .db()
1435        .await
1436        .update_project(project_id, session.connection_id, &request.worktrees)
1437        .await?;
1438    broadcast(
1439        Some(session.connection_id),
1440        guest_connection_ids.iter().copied(),
1441        |connection_id| {
1442            session
1443                .peer
1444                .forward_send(session.connection_id, connection_id, request.clone())
1445        },
1446    );
1447    room_updated(&room, &session.peer);
1448    response.send(proto::Ack {})?;
1449
1450    Ok(())
1451}
1452
1453async fn update_worktree(
1454    request: proto::UpdateWorktree,
1455    response: Response<proto::UpdateWorktree>,
1456    session: Session,
1457) -> Result<()> {
1458    let guest_connection_ids = session
1459        .db()
1460        .await
1461        .update_worktree(&request, session.connection_id)
1462        .await?;
1463
1464    broadcast(
1465        Some(session.connection_id),
1466        guest_connection_ids.iter().copied(),
1467        |connection_id| {
1468            session
1469                .peer
1470                .forward_send(session.connection_id, connection_id, request.clone())
1471        },
1472    );
1473    response.send(proto::Ack {})?;
1474    Ok(())
1475}
1476
1477async fn update_diagnostic_summary(
1478    message: proto::UpdateDiagnosticSummary,
1479    session: Session,
1480) -> Result<()> {
1481    let guest_connection_ids = session
1482        .db()
1483        .await
1484        .update_diagnostic_summary(&message, session.connection_id)
1485        .await?;
1486
1487    broadcast(
1488        Some(session.connection_id),
1489        guest_connection_ids.iter().copied(),
1490        |connection_id| {
1491            session
1492                .peer
1493                .forward_send(session.connection_id, connection_id, message.clone())
1494        },
1495    );
1496
1497    Ok(())
1498}
1499
1500async fn start_language_server(
1501    request: proto::StartLanguageServer,
1502    session: Session,
1503) -> Result<()> {
1504    let guest_connection_ids = session
1505        .db()
1506        .await
1507        .start_language_server(&request, session.connection_id)
1508        .await?;
1509
1510    broadcast(
1511        Some(session.connection_id),
1512        guest_connection_ids.iter().copied(),
1513        |connection_id| {
1514            session
1515                .peer
1516                .forward_send(session.connection_id, connection_id, request.clone())
1517        },
1518    );
1519    Ok(())
1520}
1521
1522async fn update_language_server(
1523    request: proto::UpdateLanguageServer,
1524    session: Session,
1525) -> Result<()> {
1526    session.executor.record_backtrace();
1527    let project_id = ProjectId::from_proto(request.project_id);
1528    let project_connection_ids = session
1529        .db()
1530        .await
1531        .project_connection_ids(project_id, session.connection_id)
1532        .await?;
1533    broadcast(
1534        Some(session.connection_id),
1535        project_connection_ids.iter().copied(),
1536        |connection_id| {
1537            session
1538                .peer
1539                .forward_send(session.connection_id, connection_id, request.clone())
1540        },
1541    );
1542    Ok(())
1543}
1544
1545async fn forward_project_request<T>(
1546    request: T,
1547    response: Response<T>,
1548    session: Session,
1549) -> Result<()>
1550where
1551    T: EntityMessage + RequestMessage,
1552{
1553    session.executor.record_backtrace();
1554    let project_id = ProjectId::from_proto(request.remote_entity_id());
1555    let host_connection_id = {
1556        let collaborators = session
1557            .db()
1558            .await
1559            .project_collaborators(project_id, session.connection_id)
1560            .await?;
1561        collaborators
1562            .iter()
1563            .find(|collaborator| collaborator.is_host)
1564            .ok_or_else(|| anyhow!("host not found"))?
1565            .connection_id
1566    };
1567
1568    let payload = session
1569        .peer
1570        .forward_request(session.connection_id, host_connection_id, request)
1571        .await?;
1572
1573    response.send(payload)?;
1574    Ok(())
1575}
1576
1577async fn save_buffer(
1578    request: proto::SaveBuffer,
1579    response: Response<proto::SaveBuffer>,
1580    session: Session,
1581) -> Result<()> {
1582    let project_id = ProjectId::from_proto(request.project_id);
1583    let host_connection_id = {
1584        let collaborators = session
1585            .db()
1586            .await
1587            .project_collaborators(project_id, session.connection_id)
1588            .await?;
1589        collaborators
1590            .iter()
1591            .find(|collaborator| collaborator.is_host)
1592            .ok_or_else(|| anyhow!("host not found"))?
1593            .connection_id
1594    };
1595    let response_payload = session
1596        .peer
1597        .forward_request(session.connection_id, host_connection_id, request.clone())
1598        .await?;
1599
1600    let mut collaborators = session
1601        .db()
1602        .await
1603        .project_collaborators(project_id, session.connection_id)
1604        .await?;
1605    collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
1606    let project_connection_ids = collaborators
1607        .iter()
1608        .map(|collaborator| collaborator.connection_id);
1609    broadcast(
1610        Some(host_connection_id),
1611        project_connection_ids,
1612        |conn_id| {
1613            session
1614                .peer
1615                .forward_send(host_connection_id, conn_id, response_payload.clone())
1616        },
1617    );
1618    response.send(response_payload)?;
1619    Ok(())
1620}
1621
1622async fn create_buffer_for_peer(
1623    request: proto::CreateBufferForPeer,
1624    session: Session,
1625) -> Result<()> {
1626    session.executor.record_backtrace();
1627    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1628    session
1629        .peer
1630        .forward_send(session.connection_id, peer_id.into(), request)?;
1631    Ok(())
1632}
1633
1634async fn update_buffer(
1635    request: proto::UpdateBuffer,
1636    response: Response<proto::UpdateBuffer>,
1637    session: Session,
1638) -> Result<()> {
1639    session.executor.record_backtrace();
1640    let project_id = ProjectId::from_proto(request.project_id);
1641    let project_connection_ids = session
1642        .db()
1643        .await
1644        .project_connection_ids(project_id, session.connection_id)
1645        .await?;
1646
1647    session.executor.record_backtrace();
1648
1649    broadcast(
1650        Some(session.connection_id),
1651        project_connection_ids.iter().copied(),
1652        |connection_id| {
1653            session
1654                .peer
1655                .forward_send(session.connection_id, connection_id, request.clone())
1656        },
1657    );
1658    response.send(proto::Ack {})?;
1659    Ok(())
1660}
1661
1662async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1663    let project_id = ProjectId::from_proto(request.project_id);
1664    let project_connection_ids = session
1665        .db()
1666        .await
1667        .project_connection_ids(project_id, session.connection_id)
1668        .await?;
1669
1670    broadcast(
1671        Some(session.connection_id),
1672        project_connection_ids.iter().copied(),
1673        |connection_id| {
1674            session
1675                .peer
1676                .forward_send(session.connection_id, connection_id, request.clone())
1677        },
1678    );
1679    Ok(())
1680}
1681
1682async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1683    let project_id = ProjectId::from_proto(request.project_id);
1684    let project_connection_ids = session
1685        .db()
1686        .await
1687        .project_connection_ids(project_id, session.connection_id)
1688        .await?;
1689    broadcast(
1690        Some(session.connection_id),
1691        project_connection_ids.iter().copied(),
1692        |connection_id| {
1693            session
1694                .peer
1695                .forward_send(session.connection_id, connection_id, request.clone())
1696        },
1697    );
1698    Ok(())
1699}
1700
1701async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1702    let project_id = ProjectId::from_proto(request.project_id);
1703    let project_connection_ids = session
1704        .db()
1705        .await
1706        .project_connection_ids(project_id, session.connection_id)
1707        .await?;
1708    broadcast(
1709        Some(session.connection_id),
1710        project_connection_ids.iter().copied(),
1711        |connection_id| {
1712            session
1713                .peer
1714                .forward_send(session.connection_id, connection_id, request.clone())
1715        },
1716    );
1717    Ok(())
1718}
1719
1720async fn follow(
1721    request: proto::Follow,
1722    response: Response<proto::Follow>,
1723    session: Session,
1724) -> Result<()> {
1725    let project_id = ProjectId::from_proto(request.project_id);
1726    let leader_id = request
1727        .leader_id
1728        .ok_or_else(|| anyhow!("invalid leader id"))?
1729        .into();
1730    let follower_id = session.connection_id;
1731
1732    {
1733        let project_connection_ids = session
1734            .db()
1735            .await
1736            .project_connection_ids(project_id, session.connection_id)
1737            .await?;
1738
1739        if !project_connection_ids.contains(&leader_id) {
1740            Err(anyhow!("no such peer"))?;
1741        }
1742    }
1743
1744    let mut response_payload = session
1745        .peer
1746        .forward_request(session.connection_id, leader_id, request)
1747        .await?;
1748    response_payload
1749        .views
1750        .retain(|view| view.leader_id != Some(follower_id.into()));
1751    response.send(response_payload)?;
1752
1753    let room = session
1754        .db()
1755        .await
1756        .follow(project_id, leader_id, follower_id)
1757        .await?;
1758    room_updated(&room, &session.peer);
1759
1760    Ok(())
1761}
1762
1763async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1764    let project_id = ProjectId::from_proto(request.project_id);
1765    let leader_id = request
1766        .leader_id
1767        .ok_or_else(|| anyhow!("invalid leader id"))?
1768        .into();
1769    let follower_id = session.connection_id;
1770
1771    if !session
1772        .db()
1773        .await
1774        .project_connection_ids(project_id, session.connection_id)
1775        .await?
1776        .contains(&leader_id)
1777    {
1778        Err(anyhow!("no such peer"))?;
1779    }
1780
1781    session
1782        .peer
1783        .forward_send(session.connection_id, leader_id, request)?;
1784
1785    let room = session
1786        .db()
1787        .await
1788        .unfollow(project_id, leader_id, follower_id)
1789        .await?;
1790    room_updated(&room, &session.peer);
1791
1792    Ok(())
1793}
1794
1795async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1796    let project_id = ProjectId::from_proto(request.project_id);
1797    let project_connection_ids = session
1798        .db
1799        .lock()
1800        .await
1801        .project_connection_ids(project_id, session.connection_id)
1802        .await?;
1803
1804    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1805        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1806        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1807        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1808    });
1809    for follower_peer_id in request.follower_ids.iter().copied() {
1810        let follower_connection_id = follower_peer_id.into();
1811        if project_connection_ids.contains(&follower_connection_id)
1812            && Some(follower_peer_id) != leader_id
1813        {
1814            session.peer.forward_send(
1815                session.connection_id,
1816                follower_connection_id,
1817                request.clone(),
1818            )?;
1819        }
1820    }
1821    Ok(())
1822}
1823
1824async fn get_users(
1825    request: proto::GetUsers,
1826    response: Response<proto::GetUsers>,
1827    session: Session,
1828) -> Result<()> {
1829    let user_ids = request
1830        .user_ids
1831        .into_iter()
1832        .map(UserId::from_proto)
1833        .collect();
1834    let users = session
1835        .db()
1836        .await
1837        .get_users_by_ids(user_ids)
1838        .await?
1839        .into_iter()
1840        .map(|user| proto::User {
1841            id: user.id.to_proto(),
1842            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1843            github_login: user.github_login,
1844        })
1845        .collect();
1846    response.send(proto::UsersResponse { users })?;
1847    Ok(())
1848}
1849
1850async fn fuzzy_search_users(
1851    request: proto::FuzzySearchUsers,
1852    response: Response<proto::FuzzySearchUsers>,
1853    session: Session,
1854) -> Result<()> {
1855    let query = request.query;
1856    let users = match query.len() {
1857        0 => vec![],
1858        1 | 2 => session
1859            .db()
1860            .await
1861            .get_user_by_github_account(&query, None)
1862            .await?
1863            .into_iter()
1864            .collect(),
1865        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1866    };
1867    let users = users
1868        .into_iter()
1869        .filter(|user| user.id != session.user_id)
1870        .map(|user| proto::User {
1871            id: user.id.to_proto(),
1872            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1873            github_login: user.github_login,
1874        })
1875        .collect();
1876    response.send(proto::UsersResponse { users })?;
1877    Ok(())
1878}
1879
1880async fn request_contact(
1881    request: proto::RequestContact,
1882    response: Response<proto::RequestContact>,
1883    session: Session,
1884) -> Result<()> {
1885    let requester_id = session.user_id;
1886    let responder_id = UserId::from_proto(request.responder_id);
1887    if requester_id == responder_id {
1888        return Err(anyhow!("cannot add yourself as a contact"))?;
1889    }
1890
1891    session
1892        .db()
1893        .await
1894        .send_contact_request(requester_id, responder_id)
1895        .await?;
1896
1897    // Update outgoing contact requests of requester
1898    let mut update = proto::UpdateContacts::default();
1899    update.outgoing_requests.push(responder_id.to_proto());
1900    for connection_id in session
1901        .connection_pool()
1902        .await
1903        .user_connection_ids(requester_id)
1904    {
1905        session.peer.send(connection_id, update.clone())?;
1906    }
1907
1908    // Update incoming contact requests of responder
1909    let mut update = proto::UpdateContacts::default();
1910    update
1911        .incoming_requests
1912        .push(proto::IncomingContactRequest {
1913            requester_id: requester_id.to_proto(),
1914            should_notify: true,
1915        });
1916    for connection_id in session
1917        .connection_pool()
1918        .await
1919        .user_connection_ids(responder_id)
1920    {
1921        session.peer.send(connection_id, update.clone())?;
1922    }
1923
1924    response.send(proto::Ack {})?;
1925    Ok(())
1926}
1927
1928async fn respond_to_contact_request(
1929    request: proto::RespondToContactRequest,
1930    response: Response<proto::RespondToContactRequest>,
1931    session: Session,
1932) -> Result<()> {
1933    let responder_id = session.user_id;
1934    let requester_id = UserId::from_proto(request.requester_id);
1935    let db = session.db().await;
1936    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1937        db.dismiss_contact_notification(responder_id, requester_id)
1938            .await?;
1939    } else {
1940        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1941
1942        db.respond_to_contact_request(responder_id, requester_id, accept)
1943            .await?;
1944        let requester_busy = db.is_user_busy(requester_id).await?;
1945        let responder_busy = db.is_user_busy(responder_id).await?;
1946
1947        let pool = session.connection_pool().await;
1948        // Update responder with new contact
1949        let mut update = proto::UpdateContacts::default();
1950        if accept {
1951            update
1952                .contacts
1953                .push(contact_for_user(requester_id, false, requester_busy, &pool));
1954        }
1955        update
1956            .remove_incoming_requests
1957            .push(requester_id.to_proto());
1958        for connection_id in pool.user_connection_ids(responder_id) {
1959            session.peer.send(connection_id, update.clone())?;
1960        }
1961
1962        // Update requester with new contact
1963        let mut update = proto::UpdateContacts::default();
1964        if accept {
1965            update
1966                .contacts
1967                .push(contact_for_user(responder_id, true, responder_busy, &pool));
1968        }
1969        update
1970            .remove_outgoing_requests
1971            .push(responder_id.to_proto());
1972        for connection_id in pool.user_connection_ids(requester_id) {
1973            session.peer.send(connection_id, update.clone())?;
1974        }
1975    }
1976
1977    response.send(proto::Ack {})?;
1978    Ok(())
1979}
1980
1981async fn remove_contact(
1982    request: proto::RemoveContact,
1983    response: Response<proto::RemoveContact>,
1984    session: Session,
1985) -> Result<()> {
1986    let requester_id = session.user_id;
1987    let responder_id = UserId::from_proto(request.user_id);
1988    let db = session.db().await;
1989    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
1990
1991    let pool = session.connection_pool().await;
1992    // Update outgoing contact requests of requester
1993    let mut update = proto::UpdateContacts::default();
1994    if contact_accepted {
1995        update.remove_contacts.push(responder_id.to_proto());
1996    } else {
1997        update
1998            .remove_outgoing_requests
1999            .push(responder_id.to_proto());
2000    }
2001    for connection_id in pool.user_connection_ids(requester_id) {
2002        session.peer.send(connection_id, update.clone())?;
2003    }
2004
2005    // Update incoming contact requests of responder
2006    let mut update = proto::UpdateContacts::default();
2007    if contact_accepted {
2008        update.remove_contacts.push(requester_id.to_proto());
2009    } else {
2010        update
2011            .remove_incoming_requests
2012            .push(requester_id.to_proto());
2013    }
2014    for connection_id in pool.user_connection_ids(responder_id) {
2015        session.peer.send(connection_id, update.clone())?;
2016    }
2017
2018    response.send(proto::Ack {})?;
2019    Ok(())
2020}
2021
2022async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2023    let project_id = ProjectId::from_proto(request.project_id);
2024    let project_connection_ids = session
2025        .db()
2026        .await
2027        .project_connection_ids(project_id, session.connection_id)
2028        .await?;
2029    broadcast(
2030        Some(session.connection_id),
2031        project_connection_ids.iter().copied(),
2032        |connection_id| {
2033            session
2034                .peer
2035                .forward_send(session.connection_id, connection_id, request.clone())
2036        },
2037    );
2038    Ok(())
2039}
2040
2041async fn get_private_user_info(
2042    _request: proto::GetPrivateUserInfo,
2043    response: Response<proto::GetPrivateUserInfo>,
2044    session: Session,
2045) -> Result<()> {
2046    let metrics_id = session
2047        .db()
2048        .await
2049        .get_user_metrics_id(session.user_id)
2050        .await?;
2051    let user = session
2052        .db()
2053        .await
2054        .get_user_by_id(session.user_id)
2055        .await?
2056        .ok_or_else(|| anyhow!("user not found"))?;
2057    response.send(proto::GetPrivateUserInfoResponse {
2058        metrics_id,
2059        staff: user.admin,
2060    })?;
2061    Ok(())
2062}
2063
2064fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2065    match message {
2066        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2067        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2068        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2069        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2070        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2071            code: frame.code.into(),
2072            reason: frame.reason,
2073        })),
2074    }
2075}
2076
2077fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2078    match message {
2079        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2080        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2081        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2082        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2083        AxumMessage::Close(frame) => {
2084            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2085                code: frame.code.into(),
2086                reason: frame.reason,
2087            }))
2088        }
2089    }
2090}
2091
2092fn build_initial_contacts_update(
2093    contacts: Vec<db::Contact>,
2094    pool: &ConnectionPool,
2095) -> proto::UpdateContacts {
2096    let mut update = proto::UpdateContacts::default();
2097
2098    for contact in contacts {
2099        match contact {
2100            db::Contact::Accepted {
2101                user_id,
2102                should_notify,
2103                busy,
2104            } => {
2105                update
2106                    .contacts
2107                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2108            }
2109            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2110            db::Contact::Incoming {
2111                user_id,
2112                should_notify,
2113            } => update
2114                .incoming_requests
2115                .push(proto::IncomingContactRequest {
2116                    requester_id: user_id.to_proto(),
2117                    should_notify,
2118                }),
2119        }
2120    }
2121
2122    update
2123}
2124
2125fn contact_for_user(
2126    user_id: UserId,
2127    should_notify: bool,
2128    busy: bool,
2129    pool: &ConnectionPool,
2130) -> proto::Contact {
2131    proto::Contact {
2132        user_id: user_id.to_proto(),
2133        online: pool.is_user_online(user_id),
2134        busy,
2135        should_notify,
2136    }
2137}
2138
2139fn room_updated(room: &proto::Room, peer: &Peer) {
2140    broadcast(
2141        None,
2142        room.participants
2143            .iter()
2144            .filter_map(|participant| Some(participant.peer_id?.into())),
2145        |peer_id| {
2146            peer.send(
2147                peer_id.into(),
2148                proto::RoomUpdated {
2149                    room: Some(room.clone()),
2150                },
2151            )
2152        },
2153    );
2154}
2155
2156async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2157    let db = session.db().await;
2158    let contacts = db.get_contacts(user_id).await?;
2159    let busy = db.is_user_busy(user_id).await?;
2160
2161    let pool = session.connection_pool().await;
2162    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2163    for contact in contacts {
2164        if let db::Contact::Accepted {
2165            user_id: contact_user_id,
2166            ..
2167        } = contact
2168        {
2169            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2170                session
2171                    .peer
2172                    .send(
2173                        contact_conn_id,
2174                        proto::UpdateContacts {
2175                            contacts: vec![updated_contact.clone()],
2176                            remove_contacts: Default::default(),
2177                            incoming_requests: Default::default(),
2178                            remove_incoming_requests: Default::default(),
2179                            outgoing_requests: Default::default(),
2180                            remove_outgoing_requests: Default::default(),
2181                        },
2182                    )
2183                    .trace_err();
2184            }
2185        }
2186    }
2187    Ok(())
2188}
2189
2190async fn leave_room_for_session(session: &Session) -> Result<()> {
2191    let mut contacts_to_update = HashSet::default();
2192
2193    let room_id;
2194    let canceled_calls_to_user_ids;
2195    let live_kit_room;
2196    let delete_live_kit_room;
2197    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2198        contacts_to_update.insert(session.user_id);
2199
2200        for project in left_room.left_projects.values() {
2201            project_left(project, session);
2202        }
2203
2204        room_updated(&left_room.room, &session.peer);
2205        room_id = RoomId::from_proto(left_room.room.id);
2206        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2207        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2208        delete_live_kit_room = left_room.room.participants.is_empty();
2209    } else {
2210        return Ok(());
2211    }
2212
2213    {
2214        let pool = session.connection_pool().await;
2215        for canceled_user_id in canceled_calls_to_user_ids {
2216            for connection_id in pool.user_connection_ids(canceled_user_id) {
2217                session
2218                    .peer
2219                    .send(
2220                        connection_id,
2221                        proto::CallCanceled {
2222                            room_id: room_id.to_proto(),
2223                        },
2224                    )
2225                    .trace_err();
2226            }
2227            contacts_to_update.insert(canceled_user_id);
2228        }
2229    }
2230
2231    for contact_user_id in contacts_to_update {
2232        update_user_contacts(contact_user_id, &session).await?;
2233    }
2234
2235    if let Some(live_kit) = session.live_kit_client.as_ref() {
2236        live_kit
2237            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2238            .await
2239            .trace_err();
2240
2241        if delete_live_kit_room {
2242            live_kit.delete_room(live_kit_room).await.trace_err();
2243        }
2244    }
2245
2246    Ok(())
2247}
2248
2249fn project_left(project: &db::LeftProject, session: &Session) {
2250    for connection_id in &project.connection_ids {
2251        if project.host_user_id == session.user_id {
2252            session
2253                .peer
2254                .send(
2255                    *connection_id,
2256                    proto::UnshareProject {
2257                        project_id: project.id.to_proto(),
2258                    },
2259                )
2260                .trace_err();
2261        } else {
2262            session
2263                .peer
2264                .send(
2265                    *connection_id,
2266                    proto::RemoveProjectCollaborator {
2267                        project_id: project.id.to_proto(),
2268                        peer_id: Some(session.connection_id.into()),
2269                    },
2270                )
2271                .trace_err();
2272        }
2273    }
2274}
2275
2276pub trait ResultExt {
2277    type Ok;
2278
2279    fn trace_err(self) -> Option<Self::Ok>;
2280}
2281
2282impl<T, E> ResultExt for Result<T, E>
2283where
2284    E: std::fmt::Debug,
2285{
2286    type Ok = T;
2287
2288    fn trace_err(self) -> Option<T> {
2289        match self {
2290            Ok(value) => Some(value),
2291            Err(error) => {
2292                tracing::error!("{:?}", error);
2293                None
2294            }
2295        }
2296    }
2297}