rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::Duration,
  55};
  56use tokio::sync::{watch, Semaphore};
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_message_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_request_handler(forward_project_request::<proto::GetHover>)
 204            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 205            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 206            .add_request_handler(forward_project_request::<proto::GetReferences>)
 207            .add_request_handler(forward_project_request::<proto::SearchProject>)
 208            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 209            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 210            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 213            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 214            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 215            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 216            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 217            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 218            .add_request_handler(forward_project_request::<proto::PerformRename>)
 219            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 220            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 221            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 222            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 223            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 224            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 226            .add_message_handler(create_buffer_for_peer)
 227            .add_request_handler(update_buffer)
 228            .add_message_handler(update_buffer_file)
 229            .add_message_handler(buffer_reloaded)
 230            .add_message_handler(buffer_saved)
 231            .add_request_handler(save_buffer)
 232            .add_request_handler(get_users)
 233            .add_request_handler(fuzzy_search_users)
 234            .add_request_handler(request_contact)
 235            .add_request_handler(remove_contact)
 236            .add_request_handler(respond_to_contact_request)
 237            .add_request_handler(follow)
 238            .add_message_handler(unfollow)
 239            .add_message_handler(update_followers)
 240            .add_message_handler(update_diff_base)
 241            .add_request_handler(get_private_user_info);
 242
 243        Arc::new(server)
 244    }
 245
 246    pub async fn start(&self) -> Result<()> {
 247        let server_id = *self.id.lock();
 248        let app_state = self.app_state.clone();
 249        let peer = self.peer.clone();
 250        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 251        let pool = self.connection_pool.clone();
 252        let live_kit_client = self.app_state.live_kit_client.clone();
 253
 254        let span = info_span!("start server");
 255        self.executor.spawn_detached(
 256            async move {
 257                tracing::info!("waiting for cleanup timeout");
 258                timeout.await;
 259                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 260                if let Some(room_ids) = app_state
 261                    .db
 262                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 263                    .await
 264                    .trace_err()
 265                {
 266                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 267                    for room_id in room_ids {
 268                        let mut contacts_to_update = HashSet::default();
 269                        let mut canceled_calls_to_user_ids = Vec::new();
 270                        let mut live_kit_room = String::new();
 271                        let mut delete_live_kit_room = false;
 272
 273                        if let Some(mut refreshed_room) = app_state
 274                            .db
 275                            .refresh_room(room_id, server_id)
 276                            .await
 277                            .trace_err()
 278                        {
 279                            tracing::info!(
 280                                room_id = room_id.0,
 281                                new_participant_count = refreshed_room.room.participants.len(),
 282                                "refreshed room"
 283                            );
 284                            room_updated(&refreshed_room.room, &peer);
 285                            contacts_to_update
 286                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 287                            contacts_to_update
 288                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 289                            canceled_calls_to_user_ids =
 290                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 291                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 292                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 293                        }
 294
 295                        {
 296                            let pool = pool.lock();
 297                            for canceled_user_id in canceled_calls_to_user_ids {
 298                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 299                                    peer.send(
 300                                        connection_id,
 301                                        proto::CallCanceled {
 302                                            room_id: room_id.to_proto(),
 303                                        },
 304                                    )
 305                                    .trace_err();
 306                                }
 307                            }
 308                        }
 309
 310                        for user_id in contacts_to_update {
 311                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 312                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 313                            if let Some((busy, contacts)) = busy.zip(contacts) {
 314                                let pool = pool.lock();
 315                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 316                                for contact in contacts {
 317                                    if let db::Contact::Accepted {
 318                                        user_id: contact_user_id,
 319                                        ..
 320                                    } = contact
 321                                    {
 322                                        for contact_conn_id in
 323                                            pool.user_connection_ids(contact_user_id)
 324                                        {
 325                                            peer.send(
 326                                                contact_conn_id,
 327                                                proto::UpdateContacts {
 328                                                    contacts: vec![updated_contact.clone()],
 329                                                    remove_contacts: Default::default(),
 330                                                    incoming_requests: Default::default(),
 331                                                    remove_incoming_requests: Default::default(),
 332                                                    outgoing_requests: Default::default(),
 333                                                    remove_outgoing_requests: Default::default(),
 334                                                },
 335                                            )
 336                                            .trace_err();
 337                                        }
 338                                    }
 339                                }
 340                            }
 341                        }
 342
 343                        if let Some(live_kit) = live_kit_client.as_ref() {
 344                            if delete_live_kit_room {
 345                                live_kit.delete_room(live_kit_room).await.trace_err();
 346                            }
 347                        }
 348                    }
 349                }
 350
 351                app_state
 352                    .db
 353                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 354                    .await
 355                    .trace_err();
 356            }
 357            .instrument(span),
 358        );
 359        Ok(())
 360    }
 361
 362    pub fn teardown(&self) {
 363        self.peer.teardown();
 364        self.connection_pool.lock().reset();
 365        let _ = self.teardown.send(());
 366    }
 367
 368    #[cfg(test)]
 369    pub fn reset(&self, id: ServerId) {
 370        self.teardown();
 371        *self.id.lock() = id;
 372        self.peer.reset(id.0 as u32);
 373    }
 374
 375    #[cfg(test)]
 376    pub fn id(&self) -> ServerId {
 377        *self.id.lock()
 378    }
 379
 380    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 381    where
 382        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 383        Fut: 'static + Send + Future<Output = Result<()>>,
 384        M: EnvelopedMessage,
 385    {
 386        let prev_handler = self.handlers.insert(
 387            TypeId::of::<M>(),
 388            Box::new(move |envelope, session| {
 389                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 390                let span = info_span!(
 391                    "handle message",
 392                    payload_type = envelope.payload_type_name()
 393                );
 394                span.in_scope(|| {
 395                    tracing::info!(
 396                        payload_type = envelope.payload_type_name(),
 397                        "message received"
 398                    );
 399                });
 400                let future = (handler)(*envelope, session);
 401                async move {
 402                    if let Err(error) = future.await {
 403                        tracing::error!(%error, "error handling message");
 404                    }
 405                }
 406                .instrument(span)
 407                .boxed()
 408            }),
 409        );
 410        if prev_handler.is_some() {
 411            panic!("registered a handler for the same message twice");
 412        }
 413        self
 414    }
 415
 416    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 417    where
 418        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 419        Fut: 'static + Send + Future<Output = Result<()>>,
 420        M: EnvelopedMessage,
 421    {
 422        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 423        self
 424    }
 425
 426    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 427    where
 428        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 429        Fut: Send + Future<Output = Result<()>>,
 430        M: RequestMessage,
 431    {
 432        let handler = Arc::new(handler);
 433        self.add_handler(move |envelope, session| {
 434            let receipt = envelope.receipt();
 435            let handler = handler.clone();
 436            async move {
 437                let peer = session.peer.clone();
 438                let responded = Arc::new(AtomicBool::default());
 439                let response = Response {
 440                    peer: peer.clone(),
 441                    responded: responded.clone(),
 442                    receipt,
 443                };
 444                match (handler)(envelope.payload, response, session).await {
 445                    Ok(()) => {
 446                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 447                            Ok(())
 448                        } else {
 449                            Err(anyhow!("handler did not send a response"))?
 450                        }
 451                    }
 452                    Err(error) => {
 453                        peer.respond_with_error(
 454                            receipt,
 455                            proto::Error {
 456                                message: error.to_string(),
 457                            },
 458                        )?;
 459                        Err(error)
 460                    }
 461                }
 462            }
 463        })
 464    }
 465
 466    pub fn handle_connection(
 467        self: &Arc<Self>,
 468        connection: Connection,
 469        address: String,
 470        user: User,
 471        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 472        executor: Executor,
 473    ) -> impl Future<Output = Result<()>> {
 474        let this = self.clone();
 475        let user_id = user.id;
 476        let login = user.github_login;
 477        let span = info_span!("handle connection", %user_id, %login, %address);
 478        let mut teardown = self.teardown.subscribe();
 479        async move {
 480            let (connection_id, handle_io, mut incoming_rx) = this
 481                .peer
 482                .add_connection(connection, {
 483                    let executor = executor.clone();
 484                    move |duration| executor.sleep(duration)
 485                });
 486
 487            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 488            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 489            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 490
 491            if let Some(send_connection_id) = send_connection_id.take() {
 492                let _ = send_connection_id.send(connection_id);
 493            }
 494
 495            if !user.connected_once {
 496                this.peer.send(connection_id, proto::ShowContacts {})?;
 497                this.app_state.db.set_user_connected_once(user_id, true).await?;
 498            }
 499
 500            let (contacts, invite_code) = future::try_join(
 501                this.app_state.db.get_contacts(user_id),
 502                this.app_state.db.get_invite_code_for_user(user_id)
 503            ).await?;
 504
 505            {
 506                let mut pool = this.connection_pool.lock();
 507                pool.add_connection(connection_id, user_id, user.admin);
 508                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 509
 510                if let Some((code, count)) = invite_code {
 511                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 512                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 513                        count: count as u32,
 514                    })?;
 515                }
 516            }
 517
 518            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 519                this.peer.send(connection_id, incoming_call)?;
 520            }
 521
 522            let session = Session {
 523                user_id,
 524                connection_id,
 525                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 526                peer: this.peer.clone(),
 527                connection_pool: this.connection_pool.clone(),
 528                live_kit_client: this.app_state.live_kit_client.clone(),
 529                executor: executor.clone(),
 530            };
 531            update_user_contacts(user_id, &session).await?;
 532
 533            let handle_io = handle_io.fuse();
 534            futures::pin_mut!(handle_io);
 535
 536            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 537            // This prevents deadlocks when e.g., client A performs a request to client B and
 538            // client B performs a request to client A. If both clients stop processing further
 539            // messages until their respective request completes, they won't have a chance to
 540            // respond to the other client's request and cause a deadlock.
 541            //
 542            // This arrangement ensures we will attempt to process earlier messages first, but fall
 543            // back to processing messages arrived later in the spirit of making progress.
 544            let mut foreground_message_handlers = FuturesUnordered::new();
 545            let concurrent_handlers = Arc::new(Semaphore::new(256));
 546            loop {
 547                let next_message = async {
 548                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 549                    let message = incoming_rx.next().await;
 550                    (permit, message)
 551                }.fuse();
 552                futures::pin_mut!(next_message);
 553                futures::select_biased! {
 554                    _ = teardown.changed().fuse() => return Ok(()),
 555                    result = handle_io => {
 556                        if let Err(error) = result {
 557                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 558                        }
 559                        break;
 560                    }
 561                    _ = foreground_message_handlers.next() => {}
 562                    next_message = next_message => {
 563                        let (permit, message) = next_message;
 564                        if let Some(message) = message {
 565                            let type_name = message.payload_type_name();
 566                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 567                            let span_enter = span.enter();
 568                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 569                                let is_background = message.is_background();
 570                                let handle_message = (handler)(message, session.clone());
 571                                drop(span_enter);
 572
 573                                let handle_message = async move {
 574                                    handle_message.await;
 575                                    drop(permit);
 576                                }.instrument(span);
 577                                if is_background {
 578                                    executor.spawn_detached(handle_message);
 579                                } else {
 580                                    foreground_message_handlers.push(handle_message);
 581                                }
 582                            } else {
 583                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 584                            }
 585                        } else {
 586                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 587                            break;
 588                        }
 589                    }
 590                }
 591            }
 592
 593            drop(foreground_message_handlers);
 594            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 595            if let Err(error) = connection_lost(session, teardown, executor).await {
 596                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 597            }
 598
 599            Ok(())
 600        }.instrument(span)
 601    }
 602
 603    pub async fn invite_code_redeemed(
 604        self: &Arc<Self>,
 605        inviter_id: UserId,
 606        invitee_id: UserId,
 607    ) -> Result<()> {
 608        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 609            if let Some(code) = &user.invite_code {
 610                let pool = self.connection_pool.lock();
 611                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 612                for connection_id in pool.user_connection_ids(inviter_id) {
 613                    self.peer.send(
 614                        connection_id,
 615                        proto::UpdateContacts {
 616                            contacts: vec![invitee_contact.clone()],
 617                            ..Default::default()
 618                        },
 619                    )?;
 620                    self.peer.send(
 621                        connection_id,
 622                        proto::UpdateInviteInfo {
 623                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 624                            count: user.invite_count as u32,
 625                        },
 626                    )?;
 627                }
 628            }
 629        }
 630        Ok(())
 631    }
 632
 633    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 634        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 635            if let Some(invite_code) = &user.invite_code {
 636                let pool = self.connection_pool.lock();
 637                for connection_id in pool.user_connection_ids(user_id) {
 638                    self.peer.send(
 639                        connection_id,
 640                        proto::UpdateInviteInfo {
 641                            url: format!(
 642                                "{}{}",
 643                                self.app_state.config.invite_link_prefix, invite_code
 644                            ),
 645                            count: user.invite_count as u32,
 646                        },
 647                    )?;
 648                }
 649            }
 650        }
 651        Ok(())
 652    }
 653
 654    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 655        ServerSnapshot {
 656            connection_pool: ConnectionPoolGuard {
 657                guard: self.connection_pool.lock(),
 658                _not_send: PhantomData,
 659            },
 660            peer: &self.peer,
 661        }
 662    }
 663}
 664
 665impl<'a> Deref for ConnectionPoolGuard<'a> {
 666    type Target = ConnectionPool;
 667
 668    fn deref(&self) -> &Self::Target {
 669        &*self.guard
 670    }
 671}
 672
 673impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 674    fn deref_mut(&mut self) -> &mut Self::Target {
 675        &mut *self.guard
 676    }
 677}
 678
 679impl<'a> Drop for ConnectionPoolGuard<'a> {
 680    fn drop(&mut self) {
 681        #[cfg(test)]
 682        self.check_invariants();
 683    }
 684}
 685
 686fn broadcast<F>(
 687    sender_id: Option<ConnectionId>,
 688    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 689    mut f: F,
 690) where
 691    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 692{
 693    for receiver_id in receiver_ids {
 694        if Some(receiver_id) != sender_id {
 695            if let Err(error) = f(receiver_id) {
 696                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 697            }
 698        }
 699    }
 700}
 701
 702lazy_static! {
 703    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 704}
 705
 706pub struct ProtocolVersion(u32);
 707
 708impl Header for ProtocolVersion {
 709    fn name() -> &'static HeaderName {
 710        &ZED_PROTOCOL_VERSION
 711    }
 712
 713    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 714    where
 715        Self: Sized,
 716        I: Iterator<Item = &'i axum::http::HeaderValue>,
 717    {
 718        let version = values
 719            .next()
 720            .ok_or_else(axum::headers::Error::invalid)?
 721            .to_str()
 722            .map_err(|_| axum::headers::Error::invalid())?
 723            .parse()
 724            .map_err(|_| axum::headers::Error::invalid())?;
 725        Ok(Self(version))
 726    }
 727
 728    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 729        values.extend([self.0.to_string().parse().unwrap()]);
 730    }
 731}
 732
 733pub fn routes(server: Arc<Server>) -> Router<Body> {
 734    Router::new()
 735        .route("/rpc", get(handle_websocket_request))
 736        .layer(
 737            ServiceBuilder::new()
 738                .layer(Extension(server.app_state.clone()))
 739                .layer(middleware::from_fn(auth::validate_header)),
 740        )
 741        .route("/metrics", get(handle_metrics))
 742        .layer(Extension(server))
 743}
 744
 745pub async fn handle_websocket_request(
 746    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 747    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 748    Extension(server): Extension<Arc<Server>>,
 749    Extension(user): Extension<User>,
 750    ws: WebSocketUpgrade,
 751) -> axum::response::Response {
 752    if protocol_version != rpc::PROTOCOL_VERSION {
 753        return (
 754            StatusCode::UPGRADE_REQUIRED,
 755            "client must be upgraded".to_string(),
 756        )
 757            .into_response();
 758    }
 759    let socket_address = socket_address.to_string();
 760    ws.on_upgrade(move |socket| {
 761        use util::ResultExt;
 762        let socket = socket
 763            .map_ok(to_tungstenite_message)
 764            .err_into()
 765            .with(|message| async move { Ok(to_axum_message(message)) });
 766        let connection = Connection::new(Box::pin(socket));
 767        async move {
 768            server
 769                .handle_connection(connection, socket_address, user, None, Executor::Production)
 770                .await
 771                .log_err();
 772        }
 773    })
 774}
 775
 776pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 777    let connections = server
 778        .connection_pool
 779        .lock()
 780        .connections()
 781        .filter(|connection| !connection.admin)
 782        .count();
 783
 784    METRIC_CONNECTIONS.set(connections as _);
 785
 786    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 787    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 788
 789    let encoder = prometheus::TextEncoder::new();
 790    let metric_families = prometheus::gather();
 791    let encoded_metrics = encoder
 792        .encode_to_string(&metric_families)
 793        .map_err(|err| anyhow!("{}", err))?;
 794    Ok(encoded_metrics)
 795}
 796
 797#[instrument(err, skip(executor))]
 798async fn connection_lost(
 799    session: Session,
 800    mut teardown: watch::Receiver<()>,
 801    executor: Executor,
 802) -> Result<()> {
 803    session.peer.disconnect(session.connection_id);
 804    session
 805        .connection_pool()
 806        .await
 807        .remove_connection(session.connection_id)?;
 808
 809    session
 810        .db()
 811        .await
 812        .connection_lost(session.connection_id)
 813        .await
 814        .trace_err();
 815
 816    futures::select_biased! {
 817        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 818            leave_room_for_session(&session).await.trace_err();
 819
 820            if !session
 821                .connection_pool()
 822                .await
 823                .is_user_online(session.user_id)
 824            {
 825                let db = session.db().await;
 826                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 827                    room_updated(&room, &session.peer);
 828                }
 829            }
 830            update_user_contacts(session.user_id, &session).await?;
 831        }
 832        _ = teardown.changed().fuse() => {}
 833    }
 834
 835    Ok(())
 836}
 837
 838async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 839    response.send(proto::Ack {})?;
 840    Ok(())
 841}
 842
 843async fn create_room(
 844    _request: proto::CreateRoom,
 845    response: Response<proto::CreateRoom>,
 846    session: Session,
 847) -> Result<()> {
 848    let live_kit_room = nanoid::nanoid!(30);
 849    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 850        if let Some(_) = live_kit
 851            .create_room(live_kit_room.clone())
 852            .await
 853            .trace_err()
 854        {
 855            if let Some(token) = live_kit
 856                .room_token(&live_kit_room, &session.user_id.to_string())
 857                .trace_err()
 858            {
 859                Some(proto::LiveKitConnectionInfo {
 860                    server_url: live_kit.url().into(),
 861                    token,
 862                })
 863            } else {
 864                None
 865            }
 866        } else {
 867            None
 868        }
 869    } else {
 870        None
 871    };
 872
 873    {
 874        let room = session
 875            .db()
 876            .await
 877            .create_room(session.user_id, session.connection_id, &live_kit_room)
 878            .await?;
 879
 880        response.send(proto::CreateRoomResponse {
 881            room: Some(room.clone()),
 882            live_kit_connection_info,
 883        })?;
 884    }
 885
 886    update_user_contacts(session.user_id, &session).await?;
 887    Ok(())
 888}
 889
 890async fn join_room(
 891    request: proto::JoinRoom,
 892    response: Response<proto::JoinRoom>,
 893    session: Session,
 894) -> Result<()> {
 895    let room_id = RoomId::from_proto(request.id);
 896    let room = {
 897        let room = session
 898            .db()
 899            .await
 900            .join_room(room_id, session.user_id, session.connection_id)
 901            .await?;
 902        room_updated(&room, &session.peer);
 903        room.clone()
 904    };
 905
 906    for connection_id in session
 907        .connection_pool()
 908        .await
 909        .user_connection_ids(session.user_id)
 910    {
 911        session
 912            .peer
 913            .send(
 914                connection_id,
 915                proto::CallCanceled {
 916                    room_id: room_id.to_proto(),
 917                },
 918            )
 919            .trace_err();
 920    }
 921
 922    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 923        if let Some(token) = live_kit
 924            .room_token(&room.live_kit_room, &session.user_id.to_string())
 925            .trace_err()
 926        {
 927            Some(proto::LiveKitConnectionInfo {
 928                server_url: live_kit.url().into(),
 929                token,
 930            })
 931        } else {
 932            None
 933        }
 934    } else {
 935        None
 936    };
 937
 938    response.send(proto::JoinRoomResponse {
 939        room: Some(room),
 940        live_kit_connection_info,
 941    })?;
 942
 943    update_user_contacts(session.user_id, &session).await?;
 944    Ok(())
 945}
 946
 947async fn rejoin_room(
 948    request: proto::RejoinRoom,
 949    response: Response<proto::RejoinRoom>,
 950    session: Session,
 951) -> Result<()> {
 952    {
 953        let mut rejoined_room = session
 954            .db()
 955            .await
 956            .rejoin_room(request, session.user_id, session.connection_id)
 957            .await?;
 958
 959        response.send(proto::RejoinRoomResponse {
 960            room: Some(rejoined_room.room.clone()),
 961            reshared_projects: rejoined_room
 962                .reshared_projects
 963                .iter()
 964                .map(|project| proto::ResharedProject {
 965                    id: project.id.to_proto(),
 966                    collaborators: project
 967                        .collaborators
 968                        .iter()
 969                        .map(|collaborator| collaborator.to_proto())
 970                        .collect(),
 971                })
 972                .collect(),
 973            rejoined_projects: rejoined_room
 974                .rejoined_projects
 975                .iter()
 976                .map(|rejoined_project| proto::RejoinedProject {
 977                    id: rejoined_project.id.to_proto(),
 978                    worktrees: rejoined_project
 979                        .worktrees
 980                        .iter()
 981                        .map(|worktree| proto::WorktreeMetadata {
 982                            id: worktree.id,
 983                            root_name: worktree.root_name.clone(),
 984                            visible: worktree.visible,
 985                            abs_path: worktree.abs_path.clone(),
 986                        })
 987                        .collect(),
 988                    collaborators: rejoined_project
 989                        .collaborators
 990                        .iter()
 991                        .map(|collaborator| collaborator.to_proto())
 992                        .collect(),
 993                    language_servers: rejoined_project.language_servers.clone(),
 994                })
 995                .collect(),
 996        })?;
 997        room_updated(&rejoined_room.room, &session.peer);
 998
 999        for project in &rejoined_room.reshared_projects {
1000            for collaborator in &project.collaborators {
1001                session
1002                    .peer
1003                    .send(
1004                        collaborator.connection_id,
1005                        proto::UpdateProjectCollaborator {
1006                            project_id: project.id.to_proto(),
1007                            old_peer_id: Some(project.old_connection_id.into()),
1008                            new_peer_id: Some(session.connection_id.into()),
1009                        },
1010                    )
1011                    .trace_err();
1012            }
1013
1014            broadcast(
1015                Some(session.connection_id),
1016                project
1017                    .collaborators
1018                    .iter()
1019                    .map(|collaborator| collaborator.connection_id),
1020                |connection_id| {
1021                    session.peer.forward_send(
1022                        session.connection_id,
1023                        connection_id,
1024                        proto::UpdateProject {
1025                            project_id: project.id.to_proto(),
1026                            worktrees: project.worktrees.clone(),
1027                        },
1028                    )
1029                },
1030            );
1031        }
1032
1033        for project in &rejoined_room.rejoined_projects {
1034            for collaborator in &project.collaborators {
1035                session
1036                    .peer
1037                    .send(
1038                        collaborator.connection_id,
1039                        proto::UpdateProjectCollaborator {
1040                            project_id: project.id.to_proto(),
1041                            old_peer_id: Some(project.old_connection_id.into()),
1042                            new_peer_id: Some(session.connection_id.into()),
1043                        },
1044                    )
1045                    .trace_err();
1046            }
1047        }
1048
1049        for project in &mut rejoined_room.rejoined_projects {
1050            for worktree in mem::take(&mut project.worktrees) {
1051                #[cfg(any(test, feature = "test-support"))]
1052                const MAX_CHUNK_SIZE: usize = 2;
1053                #[cfg(not(any(test, feature = "test-support")))]
1054                const MAX_CHUNK_SIZE: usize = 256;
1055
1056                // Stream this worktree's entries.
1057                let message = proto::UpdateWorktree {
1058                    project_id: project.id.to_proto(),
1059                    worktree_id: worktree.id,
1060                    abs_path: worktree.abs_path.clone(),
1061                    root_name: worktree.root_name,
1062                    updated_entries: worktree.updated_entries,
1063                    removed_entries: worktree.removed_entries,
1064                    scan_id: worktree.scan_id,
1065                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1066                };
1067                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1068                    session.peer.send(session.connection_id, update.clone())?;
1069                }
1070
1071                // Stream this worktree's diagnostics.
1072                for summary in worktree.diagnostic_summaries {
1073                    session.peer.send(
1074                        session.connection_id,
1075                        proto::UpdateDiagnosticSummary {
1076                            project_id: project.id.to_proto(),
1077                            worktree_id: worktree.id,
1078                            summary: Some(summary),
1079                        },
1080                    )?;
1081                }
1082            }
1083
1084            for language_server in &project.language_servers {
1085                session.peer.send(
1086                    session.connection_id,
1087                    proto::UpdateLanguageServer {
1088                        project_id: project.id.to_proto(),
1089                        language_server_id: language_server.id,
1090                        variant: Some(
1091                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1092                                proto::LspDiskBasedDiagnosticsUpdated {},
1093                            ),
1094                        ),
1095                    },
1096                )?;
1097            }
1098        }
1099    }
1100
1101    update_user_contacts(session.user_id, &session).await?;
1102    Ok(())
1103}
1104
1105async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
1106    leave_room_for_session(&session).await
1107}
1108
1109async fn call(
1110    request: proto::Call,
1111    response: Response<proto::Call>,
1112    session: Session,
1113) -> Result<()> {
1114    let room_id = RoomId::from_proto(request.room_id);
1115    let calling_user_id = session.user_id;
1116    let calling_connection_id = session.connection_id;
1117    let called_user_id = UserId::from_proto(request.called_user_id);
1118    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1119    if !session
1120        .db()
1121        .await
1122        .has_contact(calling_user_id, called_user_id)
1123        .await?
1124    {
1125        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1126    }
1127
1128    let incoming_call = {
1129        let (room, incoming_call) = &mut *session
1130            .db()
1131            .await
1132            .call(
1133                room_id,
1134                calling_user_id,
1135                calling_connection_id,
1136                called_user_id,
1137                initial_project_id,
1138            )
1139            .await?;
1140        room_updated(&room, &session.peer);
1141        mem::take(incoming_call)
1142    };
1143    update_user_contacts(called_user_id, &session).await?;
1144
1145    let mut calls = session
1146        .connection_pool()
1147        .await
1148        .user_connection_ids(called_user_id)
1149        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1150        .collect::<FuturesUnordered<_>>();
1151
1152    while let Some(call_response) = calls.next().await {
1153        match call_response.as_ref() {
1154            Ok(_) => {
1155                response.send(proto::Ack {})?;
1156                return Ok(());
1157            }
1158            Err(_) => {
1159                call_response.trace_err();
1160            }
1161        }
1162    }
1163
1164    {
1165        let room = session
1166            .db()
1167            .await
1168            .call_failed(room_id, called_user_id)
1169            .await?;
1170        room_updated(&room, &session.peer);
1171    }
1172    update_user_contacts(called_user_id, &session).await?;
1173
1174    Err(anyhow!("failed to ring user"))?
1175}
1176
1177async fn cancel_call(
1178    request: proto::CancelCall,
1179    response: Response<proto::CancelCall>,
1180    session: Session,
1181) -> Result<()> {
1182    let called_user_id = UserId::from_proto(request.called_user_id);
1183    let room_id = RoomId::from_proto(request.room_id);
1184    {
1185        let room = session
1186            .db()
1187            .await
1188            .cancel_call(room_id, session.connection_id, called_user_id)
1189            .await?;
1190        room_updated(&room, &session.peer);
1191    }
1192
1193    for connection_id in session
1194        .connection_pool()
1195        .await
1196        .user_connection_ids(called_user_id)
1197    {
1198        session
1199            .peer
1200            .send(
1201                connection_id,
1202                proto::CallCanceled {
1203                    room_id: room_id.to_proto(),
1204                },
1205            )
1206            .trace_err();
1207    }
1208    response.send(proto::Ack {})?;
1209
1210    update_user_contacts(called_user_id, &session).await?;
1211    Ok(())
1212}
1213
1214async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1215    let room_id = RoomId::from_proto(message.room_id);
1216    {
1217        let room = session
1218            .db()
1219            .await
1220            .decline_call(Some(room_id), session.user_id)
1221            .await?
1222            .ok_or_else(|| anyhow!("failed to decline call"))?;
1223        room_updated(&room, &session.peer);
1224    }
1225
1226    for connection_id in session
1227        .connection_pool()
1228        .await
1229        .user_connection_ids(session.user_id)
1230    {
1231        session
1232            .peer
1233            .send(
1234                connection_id,
1235                proto::CallCanceled {
1236                    room_id: room_id.to_proto(),
1237                },
1238            )
1239            .trace_err();
1240    }
1241    update_user_contacts(session.user_id, &session).await?;
1242    Ok(())
1243}
1244
1245async fn update_participant_location(
1246    request: proto::UpdateParticipantLocation,
1247    response: Response<proto::UpdateParticipantLocation>,
1248    session: Session,
1249) -> Result<()> {
1250    let room_id = RoomId::from_proto(request.room_id);
1251    let location = request
1252        .location
1253        .ok_or_else(|| anyhow!("invalid location"))?;
1254    let room = session
1255        .db()
1256        .await
1257        .update_room_participant_location(room_id, session.connection_id, location)
1258        .await?;
1259    room_updated(&room, &session.peer);
1260    response.send(proto::Ack {})?;
1261    Ok(())
1262}
1263
1264async fn share_project(
1265    request: proto::ShareProject,
1266    response: Response<proto::ShareProject>,
1267    session: Session,
1268) -> Result<()> {
1269    let (project_id, room) = &*session
1270        .db()
1271        .await
1272        .share_project(
1273            RoomId::from_proto(request.room_id),
1274            session.connection_id,
1275            &request.worktrees,
1276        )
1277        .await?;
1278    response.send(proto::ShareProjectResponse {
1279        project_id: project_id.to_proto(),
1280    })?;
1281    room_updated(&room, &session.peer);
1282
1283    Ok(())
1284}
1285
1286async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1287    let project_id = ProjectId::from_proto(message.project_id);
1288
1289    let (room, guest_connection_ids) = &*session
1290        .db()
1291        .await
1292        .unshare_project(project_id, session.connection_id)
1293        .await?;
1294
1295    broadcast(
1296        Some(session.connection_id),
1297        guest_connection_ids.iter().copied(),
1298        |conn_id| session.peer.send(conn_id, message.clone()),
1299    );
1300    room_updated(&room, &session.peer);
1301
1302    Ok(())
1303}
1304
1305async fn join_project(
1306    request: proto::JoinProject,
1307    response: Response<proto::JoinProject>,
1308    session: Session,
1309) -> Result<()> {
1310    let project_id = ProjectId::from_proto(request.project_id);
1311    let guest_user_id = session.user_id;
1312
1313    tracing::info!(%project_id, "join project");
1314
1315    let (project, replica_id) = &mut *session
1316        .db()
1317        .await
1318        .join_project(project_id, session.connection_id)
1319        .await?;
1320
1321    let collaborators = project
1322        .collaborators
1323        .iter()
1324        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1325        .map(|collaborator| collaborator.to_proto())
1326        .collect::<Vec<_>>();
1327
1328    let worktrees = project
1329        .worktrees
1330        .iter()
1331        .map(|(id, worktree)| proto::WorktreeMetadata {
1332            id: *id,
1333            root_name: worktree.root_name.clone(),
1334            visible: worktree.visible,
1335            abs_path: worktree.abs_path.clone(),
1336        })
1337        .collect::<Vec<_>>();
1338
1339    for collaborator in &collaborators {
1340        session
1341            .peer
1342            .send(
1343                collaborator.peer_id.unwrap().into(),
1344                proto::AddProjectCollaborator {
1345                    project_id: project_id.to_proto(),
1346                    collaborator: Some(proto::Collaborator {
1347                        peer_id: Some(session.connection_id.into()),
1348                        replica_id: replica_id.0 as u32,
1349                        user_id: guest_user_id.to_proto(),
1350                    }),
1351                },
1352            )
1353            .trace_err();
1354    }
1355
1356    // First, we send the metadata associated with each worktree.
1357    response.send(proto::JoinProjectResponse {
1358        worktrees: worktrees.clone(),
1359        replica_id: replica_id.0 as u32,
1360        collaborators: collaborators.clone(),
1361        language_servers: project.language_servers.clone(),
1362    })?;
1363
1364    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1365        #[cfg(any(test, feature = "test-support"))]
1366        const MAX_CHUNK_SIZE: usize = 2;
1367        #[cfg(not(any(test, feature = "test-support")))]
1368        const MAX_CHUNK_SIZE: usize = 256;
1369
1370        // Stream this worktree's entries.
1371        let message = proto::UpdateWorktree {
1372            project_id: project_id.to_proto(),
1373            worktree_id,
1374            abs_path: worktree.abs_path.clone(),
1375            root_name: worktree.root_name,
1376            updated_entries: worktree.entries,
1377            removed_entries: Default::default(),
1378            scan_id: worktree.scan_id,
1379            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1380        };
1381        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1382            session.peer.send(session.connection_id, update.clone())?;
1383        }
1384
1385        // Stream this worktree's diagnostics.
1386        for summary in worktree.diagnostic_summaries {
1387            session.peer.send(
1388                session.connection_id,
1389                proto::UpdateDiagnosticSummary {
1390                    project_id: project_id.to_proto(),
1391                    worktree_id: worktree.id,
1392                    summary: Some(summary),
1393                },
1394            )?;
1395        }
1396    }
1397
1398    for language_server in &project.language_servers {
1399        session.peer.send(
1400            session.connection_id,
1401            proto::UpdateLanguageServer {
1402                project_id: project_id.to_proto(),
1403                language_server_id: language_server.id,
1404                variant: Some(
1405                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1406                        proto::LspDiskBasedDiagnosticsUpdated {},
1407                    ),
1408                ),
1409            },
1410        )?;
1411    }
1412
1413    Ok(())
1414}
1415
1416async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1417    let sender_id = session.connection_id;
1418    let project_id = ProjectId::from_proto(request.project_id);
1419
1420    let (room, project) = &*session
1421        .db()
1422        .await
1423        .leave_project(project_id, sender_id)
1424        .await?;
1425    tracing::info!(
1426        %project_id,
1427        host_user_id = %project.host_user_id,
1428        host_connection_id = %project.host_connection_id,
1429        "leave project"
1430    );
1431
1432    project_left(&project, &session);
1433    room_updated(&room, &session.peer);
1434
1435    Ok(())
1436}
1437
1438async fn update_project(
1439    request: proto::UpdateProject,
1440    response: Response<proto::UpdateProject>,
1441    session: Session,
1442) -> Result<()> {
1443    let project_id = ProjectId::from_proto(request.project_id);
1444    let (room, guest_connection_ids) = &*session
1445        .db()
1446        .await
1447        .update_project(project_id, session.connection_id, &request.worktrees)
1448        .await?;
1449    broadcast(
1450        Some(session.connection_id),
1451        guest_connection_ids.iter().copied(),
1452        |connection_id| {
1453            session
1454                .peer
1455                .forward_send(session.connection_id, connection_id, request.clone())
1456        },
1457    );
1458    room_updated(&room, &session.peer);
1459    response.send(proto::Ack {})?;
1460
1461    Ok(())
1462}
1463
1464async fn update_worktree(
1465    request: proto::UpdateWorktree,
1466    response: Response<proto::UpdateWorktree>,
1467    session: Session,
1468) -> Result<()> {
1469    let guest_connection_ids = session
1470        .db()
1471        .await
1472        .update_worktree(&request, session.connection_id)
1473        .await?;
1474
1475    broadcast(
1476        Some(session.connection_id),
1477        guest_connection_ids.iter().copied(),
1478        |connection_id| {
1479            session
1480                .peer
1481                .forward_send(session.connection_id, connection_id, request.clone())
1482        },
1483    );
1484    response.send(proto::Ack {})?;
1485    Ok(())
1486}
1487
1488async fn update_diagnostic_summary(
1489    message: proto::UpdateDiagnosticSummary,
1490    session: Session,
1491) -> Result<()> {
1492    let guest_connection_ids = session
1493        .db()
1494        .await
1495        .update_diagnostic_summary(&message, session.connection_id)
1496        .await?;
1497
1498    broadcast(
1499        Some(session.connection_id),
1500        guest_connection_ids.iter().copied(),
1501        |connection_id| {
1502            session
1503                .peer
1504                .forward_send(session.connection_id, connection_id, message.clone())
1505        },
1506    );
1507
1508    Ok(())
1509}
1510
1511async fn start_language_server(
1512    request: proto::StartLanguageServer,
1513    session: Session,
1514) -> Result<()> {
1515    let guest_connection_ids = session
1516        .db()
1517        .await
1518        .start_language_server(&request, session.connection_id)
1519        .await?;
1520
1521    broadcast(
1522        Some(session.connection_id),
1523        guest_connection_ids.iter().copied(),
1524        |connection_id| {
1525            session
1526                .peer
1527                .forward_send(session.connection_id, connection_id, request.clone())
1528        },
1529    );
1530    Ok(())
1531}
1532
1533async fn update_language_server(
1534    request: proto::UpdateLanguageServer,
1535    session: Session,
1536) -> Result<()> {
1537    session.executor.record_backtrace();
1538    let project_id = ProjectId::from_proto(request.project_id);
1539    let project_connection_ids = session
1540        .db()
1541        .await
1542        .project_connection_ids(project_id, session.connection_id)
1543        .await?;
1544    broadcast(
1545        Some(session.connection_id),
1546        project_connection_ids.iter().copied(),
1547        |connection_id| {
1548            session
1549                .peer
1550                .forward_send(session.connection_id, connection_id, request.clone())
1551        },
1552    );
1553    Ok(())
1554}
1555
1556async fn forward_project_request<T>(
1557    request: T,
1558    response: Response<T>,
1559    session: Session,
1560) -> Result<()>
1561where
1562    T: EntityMessage + RequestMessage,
1563{
1564    session.executor.record_backtrace();
1565    let project_id = ProjectId::from_proto(request.remote_entity_id());
1566    let host_connection_id = {
1567        let collaborators = session
1568            .db()
1569            .await
1570            .project_collaborators(project_id, session.connection_id)
1571            .await?;
1572        collaborators
1573            .iter()
1574            .find(|collaborator| collaborator.is_host)
1575            .ok_or_else(|| anyhow!("host not found"))?
1576            .connection_id
1577    };
1578
1579    let payload = session
1580        .peer
1581        .forward_request(session.connection_id, host_connection_id, request)
1582        .await?;
1583
1584    response.send(payload)?;
1585    Ok(())
1586}
1587
1588async fn save_buffer(
1589    request: proto::SaveBuffer,
1590    response: Response<proto::SaveBuffer>,
1591    session: Session,
1592) -> Result<()> {
1593    let project_id = ProjectId::from_proto(request.project_id);
1594    let host_connection_id = {
1595        let collaborators = session
1596            .db()
1597            .await
1598            .project_collaborators(project_id, session.connection_id)
1599            .await?;
1600        collaborators
1601            .iter()
1602            .find(|collaborator| collaborator.is_host)
1603            .ok_or_else(|| anyhow!("host not found"))?
1604            .connection_id
1605    };
1606    let response_payload = session
1607        .peer
1608        .forward_request(session.connection_id, host_connection_id, request.clone())
1609        .await?;
1610
1611    let mut collaborators = session
1612        .db()
1613        .await
1614        .project_collaborators(project_id, session.connection_id)
1615        .await?;
1616    collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
1617    let project_connection_ids = collaborators
1618        .iter()
1619        .map(|collaborator| collaborator.connection_id);
1620    broadcast(
1621        Some(host_connection_id),
1622        project_connection_ids,
1623        |conn_id| {
1624            session
1625                .peer
1626                .forward_send(host_connection_id, conn_id, response_payload.clone())
1627        },
1628    );
1629    response.send(response_payload)?;
1630    Ok(())
1631}
1632
1633async fn create_buffer_for_peer(
1634    request: proto::CreateBufferForPeer,
1635    session: Session,
1636) -> Result<()> {
1637    session.executor.record_backtrace();
1638    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1639    session
1640        .peer
1641        .forward_send(session.connection_id, peer_id.into(), request)?;
1642    Ok(())
1643}
1644
1645async fn update_buffer(
1646    request: proto::UpdateBuffer,
1647    response: Response<proto::UpdateBuffer>,
1648    session: Session,
1649) -> Result<()> {
1650    session.executor.record_backtrace();
1651    let project_id = ProjectId::from_proto(request.project_id);
1652    let project_connection_ids = session
1653        .db()
1654        .await
1655        .project_connection_ids(project_id, session.connection_id)
1656        .await?;
1657
1658    session.executor.record_backtrace();
1659
1660    broadcast(
1661        Some(session.connection_id),
1662        project_connection_ids.iter().copied(),
1663        |connection_id| {
1664            session
1665                .peer
1666                .forward_send(session.connection_id, connection_id, request.clone())
1667        },
1668    );
1669    response.send(proto::Ack {})?;
1670    Ok(())
1671}
1672
1673async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1674    let project_id = ProjectId::from_proto(request.project_id);
1675    let project_connection_ids = session
1676        .db()
1677        .await
1678        .project_connection_ids(project_id, session.connection_id)
1679        .await?;
1680
1681    broadcast(
1682        Some(session.connection_id),
1683        project_connection_ids.iter().copied(),
1684        |connection_id| {
1685            session
1686                .peer
1687                .forward_send(session.connection_id, connection_id, request.clone())
1688        },
1689    );
1690    Ok(())
1691}
1692
1693async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1694    let project_id = ProjectId::from_proto(request.project_id);
1695    let project_connection_ids = session
1696        .db()
1697        .await
1698        .project_connection_ids(project_id, session.connection_id)
1699        .await?;
1700    broadcast(
1701        Some(session.connection_id),
1702        project_connection_ids.iter().copied(),
1703        |connection_id| {
1704            session
1705                .peer
1706                .forward_send(session.connection_id, connection_id, request.clone())
1707        },
1708    );
1709    Ok(())
1710}
1711
1712async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1713    let project_id = ProjectId::from_proto(request.project_id);
1714    let project_connection_ids = session
1715        .db()
1716        .await
1717        .project_connection_ids(project_id, session.connection_id)
1718        .await?;
1719    broadcast(
1720        Some(session.connection_id),
1721        project_connection_ids.iter().copied(),
1722        |connection_id| {
1723            session
1724                .peer
1725                .forward_send(session.connection_id, connection_id, request.clone())
1726        },
1727    );
1728    Ok(())
1729}
1730
1731async fn follow(
1732    request: proto::Follow,
1733    response: Response<proto::Follow>,
1734    session: Session,
1735) -> Result<()> {
1736    let project_id = ProjectId::from_proto(request.project_id);
1737    let leader_id = request
1738        .leader_id
1739        .ok_or_else(|| anyhow!("invalid leader id"))?
1740        .into();
1741    let follower_id = session.connection_id;
1742
1743    {
1744        let project_connection_ids = session
1745            .db()
1746            .await
1747            .project_connection_ids(project_id, session.connection_id)
1748            .await?;
1749
1750        if !project_connection_ids.contains(&leader_id) {
1751            Err(anyhow!("no such peer"))?;
1752        }
1753    }
1754
1755    let mut response_payload = session
1756        .peer
1757        .forward_request(session.connection_id, leader_id, request)
1758        .await?;
1759    response_payload
1760        .views
1761        .retain(|view| view.leader_id != Some(follower_id.into()));
1762    response.send(response_payload)?;
1763
1764    let room = session
1765        .db()
1766        .await
1767        .follow(project_id, leader_id, follower_id)
1768        .await?;
1769    room_updated(&room, &session.peer);
1770
1771    Ok(())
1772}
1773
1774async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1775    let project_id = ProjectId::from_proto(request.project_id);
1776    let leader_id = request
1777        .leader_id
1778        .ok_or_else(|| anyhow!("invalid leader id"))?
1779        .into();
1780    let follower_id = session.connection_id;
1781
1782    if !session
1783        .db()
1784        .await
1785        .project_connection_ids(project_id, session.connection_id)
1786        .await?
1787        .contains(&leader_id)
1788    {
1789        Err(anyhow!("no such peer"))?;
1790    }
1791
1792    session
1793        .peer
1794        .forward_send(session.connection_id, leader_id, request)?;
1795
1796    let room = session
1797        .db()
1798        .await
1799        .unfollow(project_id, leader_id, follower_id)
1800        .await?;
1801    room_updated(&room, &session.peer);
1802
1803    Ok(())
1804}
1805
1806async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1807    let project_id = ProjectId::from_proto(request.project_id);
1808    let project_connection_ids = session
1809        .db
1810        .lock()
1811        .await
1812        .project_connection_ids(project_id, session.connection_id)
1813        .await?;
1814
1815    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1816        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1817        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1818        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1819    });
1820    for follower_peer_id in request.follower_ids.iter().copied() {
1821        let follower_connection_id = follower_peer_id.into();
1822        if project_connection_ids.contains(&follower_connection_id)
1823            && Some(follower_peer_id) != leader_id
1824        {
1825            session.peer.forward_send(
1826                session.connection_id,
1827                follower_connection_id,
1828                request.clone(),
1829            )?;
1830        }
1831    }
1832    Ok(())
1833}
1834
1835async fn get_users(
1836    request: proto::GetUsers,
1837    response: Response<proto::GetUsers>,
1838    session: Session,
1839) -> Result<()> {
1840    let user_ids = request
1841        .user_ids
1842        .into_iter()
1843        .map(UserId::from_proto)
1844        .collect();
1845    let users = session
1846        .db()
1847        .await
1848        .get_users_by_ids(user_ids)
1849        .await?
1850        .into_iter()
1851        .map(|user| proto::User {
1852            id: user.id.to_proto(),
1853            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1854            github_login: user.github_login,
1855        })
1856        .collect();
1857    response.send(proto::UsersResponse { users })?;
1858    Ok(())
1859}
1860
1861async fn fuzzy_search_users(
1862    request: proto::FuzzySearchUsers,
1863    response: Response<proto::FuzzySearchUsers>,
1864    session: Session,
1865) -> Result<()> {
1866    let query = request.query;
1867    let users = match query.len() {
1868        0 => vec![],
1869        1 | 2 => session
1870            .db()
1871            .await
1872            .get_user_by_github_account(&query, None)
1873            .await?
1874            .into_iter()
1875            .collect(),
1876        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1877    };
1878    let users = users
1879        .into_iter()
1880        .filter(|user| user.id != session.user_id)
1881        .map(|user| proto::User {
1882            id: user.id.to_proto(),
1883            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1884            github_login: user.github_login,
1885        })
1886        .collect();
1887    response.send(proto::UsersResponse { users })?;
1888    Ok(())
1889}
1890
1891async fn request_contact(
1892    request: proto::RequestContact,
1893    response: Response<proto::RequestContact>,
1894    session: Session,
1895) -> Result<()> {
1896    let requester_id = session.user_id;
1897    let responder_id = UserId::from_proto(request.responder_id);
1898    if requester_id == responder_id {
1899        return Err(anyhow!("cannot add yourself as a contact"))?;
1900    }
1901
1902    session
1903        .db()
1904        .await
1905        .send_contact_request(requester_id, responder_id)
1906        .await?;
1907
1908    // Update outgoing contact requests of requester
1909    let mut update = proto::UpdateContacts::default();
1910    update.outgoing_requests.push(responder_id.to_proto());
1911    for connection_id in session
1912        .connection_pool()
1913        .await
1914        .user_connection_ids(requester_id)
1915    {
1916        session.peer.send(connection_id, update.clone())?;
1917    }
1918
1919    // Update incoming contact requests of responder
1920    let mut update = proto::UpdateContacts::default();
1921    update
1922        .incoming_requests
1923        .push(proto::IncomingContactRequest {
1924            requester_id: requester_id.to_proto(),
1925            should_notify: true,
1926        });
1927    for connection_id in session
1928        .connection_pool()
1929        .await
1930        .user_connection_ids(responder_id)
1931    {
1932        session.peer.send(connection_id, update.clone())?;
1933    }
1934
1935    response.send(proto::Ack {})?;
1936    Ok(())
1937}
1938
1939async fn respond_to_contact_request(
1940    request: proto::RespondToContactRequest,
1941    response: Response<proto::RespondToContactRequest>,
1942    session: Session,
1943) -> Result<()> {
1944    let responder_id = session.user_id;
1945    let requester_id = UserId::from_proto(request.requester_id);
1946    let db = session.db().await;
1947    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1948        db.dismiss_contact_notification(responder_id, requester_id)
1949            .await?;
1950    } else {
1951        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1952
1953        db.respond_to_contact_request(responder_id, requester_id, accept)
1954            .await?;
1955        let requester_busy = db.is_user_busy(requester_id).await?;
1956        let responder_busy = db.is_user_busy(responder_id).await?;
1957
1958        let pool = session.connection_pool().await;
1959        // Update responder with new contact
1960        let mut update = proto::UpdateContacts::default();
1961        if accept {
1962            update
1963                .contacts
1964                .push(contact_for_user(requester_id, false, requester_busy, &pool));
1965        }
1966        update
1967            .remove_incoming_requests
1968            .push(requester_id.to_proto());
1969        for connection_id in pool.user_connection_ids(responder_id) {
1970            session.peer.send(connection_id, update.clone())?;
1971        }
1972
1973        // Update requester with new contact
1974        let mut update = proto::UpdateContacts::default();
1975        if accept {
1976            update
1977                .contacts
1978                .push(contact_for_user(responder_id, true, responder_busy, &pool));
1979        }
1980        update
1981            .remove_outgoing_requests
1982            .push(responder_id.to_proto());
1983        for connection_id in pool.user_connection_ids(requester_id) {
1984            session.peer.send(connection_id, update.clone())?;
1985        }
1986    }
1987
1988    response.send(proto::Ack {})?;
1989    Ok(())
1990}
1991
1992async fn remove_contact(
1993    request: proto::RemoveContact,
1994    response: Response<proto::RemoveContact>,
1995    session: Session,
1996) -> Result<()> {
1997    let requester_id = session.user_id;
1998    let responder_id = UserId::from_proto(request.user_id);
1999    let db = session.db().await;
2000    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2001
2002    let pool = session.connection_pool().await;
2003    // Update outgoing contact requests of requester
2004    let mut update = proto::UpdateContacts::default();
2005    if contact_accepted {
2006        update.remove_contacts.push(responder_id.to_proto());
2007    } else {
2008        update
2009            .remove_outgoing_requests
2010            .push(responder_id.to_proto());
2011    }
2012    for connection_id in pool.user_connection_ids(requester_id) {
2013        session.peer.send(connection_id, update.clone())?;
2014    }
2015
2016    // Update incoming contact requests of responder
2017    let mut update = proto::UpdateContacts::default();
2018    if contact_accepted {
2019        update.remove_contacts.push(requester_id.to_proto());
2020    } else {
2021        update
2022            .remove_incoming_requests
2023            .push(requester_id.to_proto());
2024    }
2025    for connection_id in pool.user_connection_ids(responder_id) {
2026        session.peer.send(connection_id, update.clone())?;
2027    }
2028
2029    response.send(proto::Ack {})?;
2030    Ok(())
2031}
2032
2033async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2034    let project_id = ProjectId::from_proto(request.project_id);
2035    let project_connection_ids = session
2036        .db()
2037        .await
2038        .project_connection_ids(project_id, session.connection_id)
2039        .await?;
2040    broadcast(
2041        Some(session.connection_id),
2042        project_connection_ids.iter().copied(),
2043        |connection_id| {
2044            session
2045                .peer
2046                .forward_send(session.connection_id, connection_id, request.clone())
2047        },
2048    );
2049    Ok(())
2050}
2051
2052async fn get_private_user_info(
2053    _request: proto::GetPrivateUserInfo,
2054    response: Response<proto::GetPrivateUserInfo>,
2055    session: Session,
2056) -> Result<()> {
2057    let metrics_id = session
2058        .db()
2059        .await
2060        .get_user_metrics_id(session.user_id)
2061        .await?;
2062    let user = session
2063        .db()
2064        .await
2065        .get_user_by_id(session.user_id)
2066        .await?
2067        .ok_or_else(|| anyhow!("user not found"))?;
2068    response.send(proto::GetPrivateUserInfoResponse {
2069        metrics_id,
2070        staff: user.admin,
2071    })?;
2072    Ok(())
2073}
2074
2075fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2076    match message {
2077        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2078        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2079        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2080        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2081        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2082            code: frame.code.into(),
2083            reason: frame.reason,
2084        })),
2085    }
2086}
2087
2088fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2089    match message {
2090        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2091        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2092        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2093        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2094        AxumMessage::Close(frame) => {
2095            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2096                code: frame.code.into(),
2097                reason: frame.reason,
2098            }))
2099        }
2100    }
2101}
2102
2103fn build_initial_contacts_update(
2104    contacts: Vec<db::Contact>,
2105    pool: &ConnectionPool,
2106) -> proto::UpdateContacts {
2107    let mut update = proto::UpdateContacts::default();
2108
2109    for contact in contacts {
2110        match contact {
2111            db::Contact::Accepted {
2112                user_id,
2113                should_notify,
2114                busy,
2115            } => {
2116                update
2117                    .contacts
2118                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2119            }
2120            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2121            db::Contact::Incoming {
2122                user_id,
2123                should_notify,
2124            } => update
2125                .incoming_requests
2126                .push(proto::IncomingContactRequest {
2127                    requester_id: user_id.to_proto(),
2128                    should_notify,
2129                }),
2130        }
2131    }
2132
2133    update
2134}
2135
2136fn contact_for_user(
2137    user_id: UserId,
2138    should_notify: bool,
2139    busy: bool,
2140    pool: &ConnectionPool,
2141) -> proto::Contact {
2142    proto::Contact {
2143        user_id: user_id.to_proto(),
2144        online: pool.is_user_online(user_id),
2145        busy,
2146        should_notify,
2147    }
2148}
2149
2150fn room_updated(room: &proto::Room, peer: &Peer) {
2151    broadcast(
2152        None,
2153        room.participants
2154            .iter()
2155            .filter_map(|participant| Some(participant.peer_id?.into())),
2156        |peer_id| {
2157            peer.send(
2158                peer_id.into(),
2159                proto::RoomUpdated {
2160                    room: Some(room.clone()),
2161                },
2162            )
2163        },
2164    );
2165}
2166
2167async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2168    let db = session.db().await;
2169    let contacts = db.get_contacts(user_id).await?;
2170    let busy = db.is_user_busy(user_id).await?;
2171
2172    let pool = session.connection_pool().await;
2173    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2174    for contact in contacts {
2175        if let db::Contact::Accepted {
2176            user_id: contact_user_id,
2177            ..
2178        } = contact
2179        {
2180            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2181                session
2182                    .peer
2183                    .send(
2184                        contact_conn_id,
2185                        proto::UpdateContacts {
2186                            contacts: vec![updated_contact.clone()],
2187                            remove_contacts: Default::default(),
2188                            incoming_requests: Default::default(),
2189                            remove_incoming_requests: Default::default(),
2190                            outgoing_requests: Default::default(),
2191                            remove_outgoing_requests: Default::default(),
2192                        },
2193                    )
2194                    .trace_err();
2195            }
2196        }
2197    }
2198    Ok(())
2199}
2200
2201async fn leave_room_for_session(session: &Session) -> Result<()> {
2202    let mut contacts_to_update = HashSet::default();
2203
2204    let room_id;
2205    let canceled_calls_to_user_ids;
2206    let live_kit_room;
2207    let delete_live_kit_room;
2208    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2209        contacts_to_update.insert(session.user_id);
2210
2211        for project in left_room.left_projects.values() {
2212            project_left(project, session);
2213        }
2214
2215        room_updated(&left_room.room, &session.peer);
2216        room_id = RoomId::from_proto(left_room.room.id);
2217        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2218        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2219        delete_live_kit_room = left_room.room.participants.is_empty();
2220    } else {
2221        return Ok(());
2222    }
2223
2224    {
2225        let pool = session.connection_pool().await;
2226        for canceled_user_id in canceled_calls_to_user_ids {
2227            for connection_id in pool.user_connection_ids(canceled_user_id) {
2228                session
2229                    .peer
2230                    .send(
2231                        connection_id,
2232                        proto::CallCanceled {
2233                            room_id: room_id.to_proto(),
2234                        },
2235                    )
2236                    .trace_err();
2237            }
2238            contacts_to_update.insert(canceled_user_id);
2239        }
2240    }
2241
2242    for contact_user_id in contacts_to_update {
2243        update_user_contacts(contact_user_id, &session).await?;
2244    }
2245
2246    if let Some(live_kit) = session.live_kit_client.as_ref() {
2247        live_kit
2248            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2249            .await
2250            .trace_err();
2251
2252        if delete_live_kit_room {
2253            live_kit.delete_room(live_kit_room).await.trace_err();
2254        }
2255    }
2256
2257    Ok(())
2258}
2259
2260fn project_left(project: &db::LeftProject, session: &Session) {
2261    for connection_id in &project.connection_ids {
2262        if project.host_user_id == session.user_id {
2263            session
2264                .peer
2265                .send(
2266                    *connection_id,
2267                    proto::UnshareProject {
2268                        project_id: project.id.to_proto(),
2269                    },
2270                )
2271                .trace_err();
2272        } else {
2273            session
2274                .peer
2275                .send(
2276                    *connection_id,
2277                    proto::RemoveProjectCollaborator {
2278                        project_id: project.id.to_proto(),
2279                        peer_id: Some(session.connection_id.into()),
2280                    },
2281                )
2282                .trace_err();
2283        }
2284    }
2285}
2286
2287pub trait ResultExt {
2288    type Ok;
2289
2290    fn trace_err(self) -> Option<Self::Ok>;
2291}
2292
2293impl<T, E> ResultExt for Result<T, E>
2294where
2295    E: std::fmt::Debug,
2296{
2297    type Ok = T;
2298
2299    fn trace_err(self) -> Option<T> {
2300        match self {
2301            Ok(value) => Some(value),
2302            Err(error) => {
2303                tracing::error!("{:?}", error);
2304                None
2305            }
2306        }
2307    }
2308}