rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::{Duration, Instant},
  55};
  56use tokio::sync::{watch, Semaphore};
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_request_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_message_handler(update_worktree_settings)
 204            .add_request_handler(forward_project_request::<proto::GetHover>)
 205            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 206            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 207            .add_request_handler(forward_project_request::<proto::GetReferences>)
 208            .add_request_handler(forward_project_request::<proto::SearchProject>)
 209            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 210            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 213            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 214            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 215            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 216            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 217            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 218            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 219            .add_request_handler(forward_project_request::<proto::PerformRename>)
 220            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 221            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 222            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 223            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 224            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 226            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 227            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 228            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 229            .add_request_handler(forward_project_request::<proto::InlayHints>)
 230            .add_message_handler(create_buffer_for_peer)
 231            .add_request_handler(update_buffer)
 232            .add_message_handler(update_buffer_file)
 233            .add_message_handler(buffer_reloaded)
 234            .add_message_handler(buffer_saved)
 235            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 236            .add_request_handler(get_users)
 237            .add_request_handler(fuzzy_search_users)
 238            .add_request_handler(request_contact)
 239            .add_request_handler(remove_contact)
 240            .add_request_handler(respond_to_contact_request)
 241            .add_request_handler(follow)
 242            .add_message_handler(unfollow)
 243            .add_message_handler(update_followers)
 244            .add_message_handler(update_diff_base)
 245            .add_request_handler(get_private_user_info);
 246
 247        Arc::new(server)
 248    }
 249
 250    pub async fn start(&self) -> Result<()> {
 251        let server_id = *self.id.lock();
 252        let app_state = self.app_state.clone();
 253        let peer = self.peer.clone();
 254        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 255        let pool = self.connection_pool.clone();
 256        let live_kit_client = self.app_state.live_kit_client.clone();
 257
 258        let span = info_span!("start server");
 259        self.executor.spawn_detached(
 260            async move {
 261                tracing::info!("waiting for cleanup timeout");
 262                timeout.await;
 263                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 264                if let Some(room_ids) = app_state
 265                    .db
 266                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 267                    .await
 268                    .trace_err()
 269                {
 270                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 271                    for room_id in room_ids {
 272                        let mut contacts_to_update = HashSet::default();
 273                        let mut canceled_calls_to_user_ids = Vec::new();
 274                        let mut live_kit_room = String::new();
 275                        let mut delete_live_kit_room = false;
 276
 277                        if let Some(mut refreshed_room) = app_state
 278                            .db
 279                            .refresh_room(room_id, server_id)
 280                            .await
 281                            .trace_err()
 282                        {
 283                            tracing::info!(
 284                                room_id = room_id.0,
 285                                new_participant_count = refreshed_room.room.participants.len(),
 286                                "refreshed room"
 287                            );
 288                            room_updated(&refreshed_room.room, &peer);
 289                            contacts_to_update
 290                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 291                            contacts_to_update
 292                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 293                            canceled_calls_to_user_ids =
 294                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 295                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 296                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 297                        }
 298
 299                        {
 300                            let pool = pool.lock();
 301                            for canceled_user_id in canceled_calls_to_user_ids {
 302                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 303                                    peer.send(
 304                                        connection_id,
 305                                        proto::CallCanceled {
 306                                            room_id: room_id.to_proto(),
 307                                        },
 308                                    )
 309                                    .trace_err();
 310                                }
 311                            }
 312                        }
 313
 314                        for user_id in contacts_to_update {
 315                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 316                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 317                            if let Some((busy, contacts)) = busy.zip(contacts) {
 318                                let pool = pool.lock();
 319                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 320                                for contact in contacts {
 321                                    if let db::Contact::Accepted {
 322                                        user_id: contact_user_id,
 323                                        ..
 324                                    } = contact
 325                                    {
 326                                        for contact_conn_id in
 327                                            pool.user_connection_ids(contact_user_id)
 328                                        {
 329                                            peer.send(
 330                                                contact_conn_id,
 331                                                proto::UpdateContacts {
 332                                                    contacts: vec![updated_contact.clone()],
 333                                                    remove_contacts: Default::default(),
 334                                                    incoming_requests: Default::default(),
 335                                                    remove_incoming_requests: Default::default(),
 336                                                    outgoing_requests: Default::default(),
 337                                                    remove_outgoing_requests: Default::default(),
 338                                                },
 339                                            )
 340                                            .trace_err();
 341                                        }
 342                                    }
 343                                }
 344                            }
 345                        }
 346
 347                        if let Some(live_kit) = live_kit_client.as_ref() {
 348                            if delete_live_kit_room {
 349                                live_kit.delete_room(live_kit_room).await.trace_err();
 350                            }
 351                        }
 352                    }
 353                }
 354
 355                app_state
 356                    .db
 357                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 358                    .await
 359                    .trace_err();
 360            }
 361            .instrument(span),
 362        );
 363        Ok(())
 364    }
 365
 366    pub fn teardown(&self) {
 367        self.peer.teardown();
 368        self.connection_pool.lock().reset();
 369        let _ = self.teardown.send(());
 370    }
 371
 372    #[cfg(test)]
 373    pub fn reset(&self, id: ServerId) {
 374        self.teardown();
 375        *self.id.lock() = id;
 376        self.peer.reset(id.0 as u32);
 377    }
 378
 379    #[cfg(test)]
 380    pub fn id(&self) -> ServerId {
 381        *self.id.lock()
 382    }
 383
 384    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 385    where
 386        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 387        Fut: 'static + Send + Future<Output = Result<()>>,
 388        M: EnvelopedMessage,
 389    {
 390        let prev_handler = self.handlers.insert(
 391            TypeId::of::<M>(),
 392            Box::new(move |envelope, session| {
 393                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 394                let span = info_span!(
 395                    "handle message",
 396                    payload_type = envelope.payload_type_name()
 397                );
 398                span.in_scope(|| {
 399                    tracing::info!(
 400                        payload_type = envelope.payload_type_name(),
 401                        "message received"
 402                    );
 403                });
 404                let start_time = Instant::now();
 405                let future = (handler)(*envelope, session);
 406                async move {
 407                    let result = future.await;
 408                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 409                    match result {
 410                        Err(error) => {
 411                            tracing::error!(%error, ?duration_ms, "error handling message")
 412                        }
 413                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 414                    }
 415                }
 416                .instrument(span)
 417                .boxed()
 418            }),
 419        );
 420        if prev_handler.is_some() {
 421            panic!("registered a handler for the same message twice");
 422        }
 423        self
 424    }
 425
 426    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 427    where
 428        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 429        Fut: 'static + Send + Future<Output = Result<()>>,
 430        M: EnvelopedMessage,
 431    {
 432        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 433        self
 434    }
 435
 436    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 437    where
 438        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 439        Fut: Send + Future<Output = Result<()>>,
 440        M: RequestMessage,
 441    {
 442        let handler = Arc::new(handler);
 443        self.add_handler(move |envelope, session| {
 444            let receipt = envelope.receipt();
 445            let handler = handler.clone();
 446            async move {
 447                let peer = session.peer.clone();
 448                let responded = Arc::new(AtomicBool::default());
 449                let response = Response {
 450                    peer: peer.clone(),
 451                    responded: responded.clone(),
 452                    receipt,
 453                };
 454                match (handler)(envelope.payload, response, session).await {
 455                    Ok(()) => {
 456                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 457                            Ok(())
 458                        } else {
 459                            Err(anyhow!("handler did not send a response"))?
 460                        }
 461                    }
 462                    Err(error) => {
 463                        peer.respond_with_error(
 464                            receipt,
 465                            proto::Error {
 466                                message: error.to_string(),
 467                            },
 468                        )?;
 469                        Err(error)
 470                    }
 471                }
 472            }
 473        })
 474    }
 475
 476    pub fn handle_connection(
 477        self: &Arc<Self>,
 478        connection: Connection,
 479        address: String,
 480        user: User,
 481        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 482        executor: Executor,
 483    ) -> impl Future<Output = Result<()>> {
 484        let this = self.clone();
 485        let user_id = user.id;
 486        let login = user.github_login;
 487        let span = info_span!("handle connection", %user_id, %login, %address);
 488        let mut teardown = self.teardown.subscribe();
 489        async move {
 490            let (connection_id, handle_io, mut incoming_rx) = this
 491                .peer
 492                .add_connection(connection, {
 493                    let executor = executor.clone();
 494                    move |duration| executor.sleep(duration)
 495                });
 496
 497            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 498            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 499            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 500
 501            if let Some(send_connection_id) = send_connection_id.take() {
 502                let _ = send_connection_id.send(connection_id);
 503            }
 504
 505            if !user.connected_once {
 506                this.peer.send(connection_id, proto::ShowContacts {})?;
 507                this.app_state.db.set_user_connected_once(user_id, true).await?;
 508            }
 509
 510            let (contacts, invite_code) = future::try_join(
 511                this.app_state.db.get_contacts(user_id),
 512                this.app_state.db.get_invite_code_for_user(user_id)
 513            ).await?;
 514
 515            {
 516                let mut pool = this.connection_pool.lock();
 517                pool.add_connection(connection_id, user_id, user.admin);
 518                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 519
 520                if let Some((code, count)) = invite_code {
 521                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 522                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 523                        count: count as u32,
 524                    })?;
 525                }
 526            }
 527
 528            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 529                this.peer.send(connection_id, incoming_call)?;
 530            }
 531
 532            let session = Session {
 533                user_id,
 534                connection_id,
 535                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 536                peer: this.peer.clone(),
 537                connection_pool: this.connection_pool.clone(),
 538                live_kit_client: this.app_state.live_kit_client.clone(),
 539                executor: executor.clone(),
 540            };
 541            update_user_contacts(user_id, &session).await?;
 542
 543            let handle_io = handle_io.fuse();
 544            futures::pin_mut!(handle_io);
 545
 546            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 547            // This prevents deadlocks when e.g., client A performs a request to client B and
 548            // client B performs a request to client A. If both clients stop processing further
 549            // messages until their respective request completes, they won't have a chance to
 550            // respond to the other client's request and cause a deadlock.
 551            //
 552            // This arrangement ensures we will attempt to process earlier messages first, but fall
 553            // back to processing messages arrived later in the spirit of making progress.
 554            let mut foreground_message_handlers = FuturesUnordered::new();
 555            let concurrent_handlers = Arc::new(Semaphore::new(256));
 556            loop {
 557                let next_message = async {
 558                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 559                    let message = incoming_rx.next().await;
 560                    (permit, message)
 561                }.fuse();
 562                futures::pin_mut!(next_message);
 563                futures::select_biased! {
 564                    _ = teardown.changed().fuse() => return Ok(()),
 565                    result = handle_io => {
 566                        if let Err(error) = result {
 567                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 568                        }
 569                        break;
 570                    }
 571                    _ = foreground_message_handlers.next() => {}
 572                    next_message = next_message => {
 573                        let (permit, message) = next_message;
 574                        if let Some(message) = message {
 575                            let type_name = message.payload_type_name();
 576                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 577                            let span_enter = span.enter();
 578                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 579                                let is_background = message.is_background();
 580                                let handle_message = (handler)(message, session.clone());
 581                                drop(span_enter);
 582
 583                                let handle_message = async move {
 584                                    handle_message.await;
 585                                    drop(permit);
 586                                }.instrument(span);
 587                                if is_background {
 588                                    executor.spawn_detached(handle_message);
 589                                } else {
 590                                    foreground_message_handlers.push(handle_message);
 591                                }
 592                            } else {
 593                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 594                            }
 595                        } else {
 596                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 597                            break;
 598                        }
 599                    }
 600                }
 601            }
 602
 603            drop(foreground_message_handlers);
 604            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 605            if let Err(error) = connection_lost(session, teardown, executor).await {
 606                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 607            }
 608
 609            Ok(())
 610        }.instrument(span)
 611    }
 612
 613    pub async fn invite_code_redeemed(
 614        self: &Arc<Self>,
 615        inviter_id: UserId,
 616        invitee_id: UserId,
 617    ) -> Result<()> {
 618        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 619            if let Some(code) = &user.invite_code {
 620                let pool = self.connection_pool.lock();
 621                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 622                for connection_id in pool.user_connection_ids(inviter_id) {
 623                    self.peer.send(
 624                        connection_id,
 625                        proto::UpdateContacts {
 626                            contacts: vec![invitee_contact.clone()],
 627                            ..Default::default()
 628                        },
 629                    )?;
 630                    self.peer.send(
 631                        connection_id,
 632                        proto::UpdateInviteInfo {
 633                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 634                            count: user.invite_count as u32,
 635                        },
 636                    )?;
 637                }
 638            }
 639        }
 640        Ok(())
 641    }
 642
 643    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 644        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 645            if let Some(invite_code) = &user.invite_code {
 646                let pool = self.connection_pool.lock();
 647                for connection_id in pool.user_connection_ids(user_id) {
 648                    self.peer.send(
 649                        connection_id,
 650                        proto::UpdateInviteInfo {
 651                            url: format!(
 652                                "{}{}",
 653                                self.app_state.config.invite_link_prefix, invite_code
 654                            ),
 655                            count: user.invite_count as u32,
 656                        },
 657                    )?;
 658                }
 659            }
 660        }
 661        Ok(())
 662    }
 663
 664    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 665        ServerSnapshot {
 666            connection_pool: ConnectionPoolGuard {
 667                guard: self.connection_pool.lock(),
 668                _not_send: PhantomData,
 669            },
 670            peer: &self.peer,
 671        }
 672    }
 673}
 674
 675impl<'a> Deref for ConnectionPoolGuard<'a> {
 676    type Target = ConnectionPool;
 677
 678    fn deref(&self) -> &Self::Target {
 679        &*self.guard
 680    }
 681}
 682
 683impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 684    fn deref_mut(&mut self) -> &mut Self::Target {
 685        &mut *self.guard
 686    }
 687}
 688
 689impl<'a> Drop for ConnectionPoolGuard<'a> {
 690    fn drop(&mut self) {
 691        #[cfg(test)]
 692        self.check_invariants();
 693    }
 694}
 695
 696fn broadcast<F>(
 697    sender_id: Option<ConnectionId>,
 698    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 699    mut f: F,
 700) where
 701    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 702{
 703    for receiver_id in receiver_ids {
 704        if Some(receiver_id) != sender_id {
 705            if let Err(error) = f(receiver_id) {
 706                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 707            }
 708        }
 709    }
 710}
 711
 712lazy_static! {
 713    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 714}
 715
 716pub struct ProtocolVersion(u32);
 717
 718impl Header for ProtocolVersion {
 719    fn name() -> &'static HeaderName {
 720        &ZED_PROTOCOL_VERSION
 721    }
 722
 723    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 724    where
 725        Self: Sized,
 726        I: Iterator<Item = &'i axum::http::HeaderValue>,
 727    {
 728        let version = values
 729            .next()
 730            .ok_or_else(axum::headers::Error::invalid)?
 731            .to_str()
 732            .map_err(|_| axum::headers::Error::invalid())?
 733            .parse()
 734            .map_err(|_| axum::headers::Error::invalid())?;
 735        Ok(Self(version))
 736    }
 737
 738    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 739        values.extend([self.0.to_string().parse().unwrap()]);
 740    }
 741}
 742
 743pub fn routes(server: Arc<Server>) -> Router<Body> {
 744    Router::new()
 745        .route("/rpc", get(handle_websocket_request))
 746        .layer(
 747            ServiceBuilder::new()
 748                .layer(Extension(server.app_state.clone()))
 749                .layer(middleware::from_fn(auth::validate_header)),
 750        )
 751        .route("/metrics", get(handle_metrics))
 752        .layer(Extension(server))
 753}
 754
 755pub async fn handle_websocket_request(
 756    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 757    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 758    Extension(server): Extension<Arc<Server>>,
 759    Extension(user): Extension<User>,
 760    ws: WebSocketUpgrade,
 761) -> axum::response::Response {
 762    if protocol_version != rpc::PROTOCOL_VERSION {
 763        return (
 764            StatusCode::UPGRADE_REQUIRED,
 765            "client must be upgraded".to_string(),
 766        )
 767            .into_response();
 768    }
 769    let socket_address = socket_address.to_string();
 770    ws.on_upgrade(move |socket| {
 771        use util::ResultExt;
 772        let socket = socket
 773            .map_ok(to_tungstenite_message)
 774            .err_into()
 775            .with(|message| async move { Ok(to_axum_message(message)) });
 776        let connection = Connection::new(Box::pin(socket));
 777        async move {
 778            server
 779                .handle_connection(connection, socket_address, user, None, Executor::Production)
 780                .await
 781                .log_err();
 782        }
 783    })
 784}
 785
 786pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 787    let connections = server
 788        .connection_pool
 789        .lock()
 790        .connections()
 791        .filter(|connection| !connection.admin)
 792        .count();
 793
 794    METRIC_CONNECTIONS.set(connections as _);
 795
 796    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 797    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 798
 799    let encoder = prometheus::TextEncoder::new();
 800    let metric_families = prometheus::gather();
 801    let encoded_metrics = encoder
 802        .encode_to_string(&metric_families)
 803        .map_err(|err| anyhow!("{}", err))?;
 804    Ok(encoded_metrics)
 805}
 806
 807#[instrument(err, skip(executor))]
 808async fn connection_lost(
 809    session: Session,
 810    mut teardown: watch::Receiver<()>,
 811    executor: Executor,
 812) -> Result<()> {
 813    session.peer.disconnect(session.connection_id);
 814    session
 815        .connection_pool()
 816        .await
 817        .remove_connection(session.connection_id)?;
 818
 819    session
 820        .db()
 821        .await
 822        .connection_lost(session.connection_id)
 823        .await
 824        .trace_err();
 825
 826    futures::select_biased! {
 827        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 828            leave_room_for_session(&session).await.trace_err();
 829
 830            if !session
 831                .connection_pool()
 832                .await
 833                .is_user_online(session.user_id)
 834            {
 835                let db = session.db().await;
 836                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 837                    room_updated(&room, &session.peer);
 838                }
 839            }
 840            update_user_contacts(session.user_id, &session).await?;
 841        }
 842        _ = teardown.changed().fuse() => {}
 843    }
 844
 845    Ok(())
 846}
 847
 848async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 849    response.send(proto::Ack {})?;
 850    Ok(())
 851}
 852
 853async fn create_room(
 854    _request: proto::CreateRoom,
 855    response: Response<proto::CreateRoom>,
 856    session: Session,
 857) -> Result<()> {
 858    let live_kit_room = nanoid::nanoid!(30);
 859    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 860        if let Some(_) = live_kit
 861            .create_room(live_kit_room.clone())
 862            .await
 863            .trace_err()
 864        {
 865            if let Some(token) = live_kit
 866                .room_token(&live_kit_room, &session.user_id.to_string())
 867                .trace_err()
 868            {
 869                Some(proto::LiveKitConnectionInfo {
 870                    server_url: live_kit.url().into(),
 871                    token,
 872                })
 873            } else {
 874                None
 875            }
 876        } else {
 877            None
 878        }
 879    } else {
 880        None
 881    };
 882
 883    {
 884        let room = session
 885            .db()
 886            .await
 887            .create_room(session.user_id, session.connection_id, &live_kit_room)
 888            .await?;
 889
 890        response.send(proto::CreateRoomResponse {
 891            room: Some(room.clone()),
 892            live_kit_connection_info,
 893        })?;
 894    }
 895
 896    update_user_contacts(session.user_id, &session).await?;
 897    Ok(())
 898}
 899
 900async fn join_room(
 901    request: proto::JoinRoom,
 902    response: Response<proto::JoinRoom>,
 903    session: Session,
 904) -> Result<()> {
 905    let room_id = RoomId::from_proto(request.id);
 906    let room = {
 907        let room = session
 908            .db()
 909            .await
 910            .join_room(room_id, session.user_id, session.connection_id)
 911            .await?;
 912        room_updated(&room, &session.peer);
 913        room.clone()
 914    };
 915
 916    for connection_id in session
 917        .connection_pool()
 918        .await
 919        .user_connection_ids(session.user_id)
 920    {
 921        session
 922            .peer
 923            .send(
 924                connection_id,
 925                proto::CallCanceled {
 926                    room_id: room_id.to_proto(),
 927                },
 928            )
 929            .trace_err();
 930    }
 931
 932    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 933        if let Some(token) = live_kit
 934            .room_token(&room.live_kit_room, &session.user_id.to_string())
 935            .trace_err()
 936        {
 937            Some(proto::LiveKitConnectionInfo {
 938                server_url: live_kit.url().into(),
 939                token,
 940            })
 941        } else {
 942            None
 943        }
 944    } else {
 945        None
 946    };
 947
 948    response.send(proto::JoinRoomResponse {
 949        room: Some(room),
 950        live_kit_connection_info,
 951    })?;
 952
 953    update_user_contacts(session.user_id, &session).await?;
 954    Ok(())
 955}
 956
 957async fn rejoin_room(
 958    request: proto::RejoinRoom,
 959    response: Response<proto::RejoinRoom>,
 960    session: Session,
 961) -> Result<()> {
 962    {
 963        let mut rejoined_room = session
 964            .db()
 965            .await
 966            .rejoin_room(request, session.user_id, session.connection_id)
 967            .await?;
 968
 969        response.send(proto::RejoinRoomResponse {
 970            room: Some(rejoined_room.room.clone()),
 971            reshared_projects: rejoined_room
 972                .reshared_projects
 973                .iter()
 974                .map(|project| proto::ResharedProject {
 975                    id: project.id.to_proto(),
 976                    collaborators: project
 977                        .collaborators
 978                        .iter()
 979                        .map(|collaborator| collaborator.to_proto())
 980                        .collect(),
 981                })
 982                .collect(),
 983            rejoined_projects: rejoined_room
 984                .rejoined_projects
 985                .iter()
 986                .map(|rejoined_project| proto::RejoinedProject {
 987                    id: rejoined_project.id.to_proto(),
 988                    worktrees: rejoined_project
 989                        .worktrees
 990                        .iter()
 991                        .map(|worktree| proto::WorktreeMetadata {
 992                            id: worktree.id,
 993                            root_name: worktree.root_name.clone(),
 994                            visible: worktree.visible,
 995                            abs_path: worktree.abs_path.clone(),
 996                        })
 997                        .collect(),
 998                    collaborators: rejoined_project
 999                        .collaborators
1000                        .iter()
1001                        .map(|collaborator| collaborator.to_proto())
1002                        .collect(),
1003                    language_servers: rejoined_project.language_servers.clone(),
1004                })
1005                .collect(),
1006        })?;
1007        room_updated(&rejoined_room.room, &session.peer);
1008
1009        for project in &rejoined_room.reshared_projects {
1010            for collaborator in &project.collaborators {
1011                session
1012                    .peer
1013                    .send(
1014                        collaborator.connection_id,
1015                        proto::UpdateProjectCollaborator {
1016                            project_id: project.id.to_proto(),
1017                            old_peer_id: Some(project.old_connection_id.into()),
1018                            new_peer_id: Some(session.connection_id.into()),
1019                        },
1020                    )
1021                    .trace_err();
1022            }
1023
1024            broadcast(
1025                Some(session.connection_id),
1026                project
1027                    .collaborators
1028                    .iter()
1029                    .map(|collaborator| collaborator.connection_id),
1030                |connection_id| {
1031                    session.peer.forward_send(
1032                        session.connection_id,
1033                        connection_id,
1034                        proto::UpdateProject {
1035                            project_id: project.id.to_proto(),
1036                            worktrees: project.worktrees.clone(),
1037                        },
1038                    )
1039                },
1040            );
1041        }
1042
1043        for project in &rejoined_room.rejoined_projects {
1044            for collaborator in &project.collaborators {
1045                session
1046                    .peer
1047                    .send(
1048                        collaborator.connection_id,
1049                        proto::UpdateProjectCollaborator {
1050                            project_id: project.id.to_proto(),
1051                            old_peer_id: Some(project.old_connection_id.into()),
1052                            new_peer_id: Some(session.connection_id.into()),
1053                        },
1054                    )
1055                    .trace_err();
1056            }
1057        }
1058
1059        for project in &mut rejoined_room.rejoined_projects {
1060            for worktree in mem::take(&mut project.worktrees) {
1061                #[cfg(any(test, feature = "test-support"))]
1062                const MAX_CHUNK_SIZE: usize = 2;
1063                #[cfg(not(any(test, feature = "test-support")))]
1064                const MAX_CHUNK_SIZE: usize = 256;
1065
1066                // Stream this worktree's entries.
1067                let message = proto::UpdateWorktree {
1068                    project_id: project.id.to_proto(),
1069                    worktree_id: worktree.id,
1070                    abs_path: worktree.abs_path.clone(),
1071                    root_name: worktree.root_name,
1072                    updated_entries: worktree.updated_entries,
1073                    removed_entries: worktree.removed_entries,
1074                    scan_id: worktree.scan_id,
1075                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1076                    updated_repositories: worktree.updated_repositories,
1077                    removed_repositories: worktree.removed_repositories,
1078                };
1079                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1080                    session.peer.send(session.connection_id, update.clone())?;
1081                }
1082
1083                // Stream this worktree's diagnostics.
1084                for summary in worktree.diagnostic_summaries {
1085                    session.peer.send(
1086                        session.connection_id,
1087                        proto::UpdateDiagnosticSummary {
1088                            project_id: project.id.to_proto(),
1089                            worktree_id: worktree.id,
1090                            summary: Some(summary),
1091                        },
1092                    )?;
1093                }
1094
1095                for settings_file in worktree.settings_files {
1096                    session.peer.send(
1097                        session.connection_id,
1098                        proto::UpdateWorktreeSettings {
1099                            project_id: project.id.to_proto(),
1100                            worktree_id: worktree.id,
1101                            path: settings_file.path,
1102                            content: Some(settings_file.content),
1103                        },
1104                    )?;
1105                }
1106            }
1107
1108            for language_server in &project.language_servers {
1109                session.peer.send(
1110                    session.connection_id,
1111                    proto::UpdateLanguageServer {
1112                        project_id: project.id.to_proto(),
1113                        language_server_id: language_server.id,
1114                        variant: Some(
1115                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1116                                proto::LspDiskBasedDiagnosticsUpdated {},
1117                            ),
1118                        ),
1119                    },
1120                )?;
1121            }
1122        }
1123    }
1124
1125    update_user_contacts(session.user_id, &session).await?;
1126    Ok(())
1127}
1128
1129async fn leave_room(
1130    _: proto::LeaveRoom,
1131    response: Response<proto::LeaveRoom>,
1132    session: Session,
1133) -> Result<()> {
1134    leave_room_for_session(&session).await?;
1135    response.send(proto::Ack {})?;
1136    Ok(())
1137}
1138
1139async fn call(
1140    request: proto::Call,
1141    response: Response<proto::Call>,
1142    session: Session,
1143) -> Result<()> {
1144    let room_id = RoomId::from_proto(request.room_id);
1145    let calling_user_id = session.user_id;
1146    let calling_connection_id = session.connection_id;
1147    let called_user_id = UserId::from_proto(request.called_user_id);
1148    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1149    if !session
1150        .db()
1151        .await
1152        .has_contact(calling_user_id, called_user_id)
1153        .await?
1154    {
1155        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1156    }
1157
1158    let incoming_call = {
1159        let (room, incoming_call) = &mut *session
1160            .db()
1161            .await
1162            .call(
1163                room_id,
1164                calling_user_id,
1165                calling_connection_id,
1166                called_user_id,
1167                initial_project_id,
1168            )
1169            .await?;
1170        room_updated(&room, &session.peer);
1171        mem::take(incoming_call)
1172    };
1173    update_user_contacts(called_user_id, &session).await?;
1174
1175    let mut calls = session
1176        .connection_pool()
1177        .await
1178        .user_connection_ids(called_user_id)
1179        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1180        .collect::<FuturesUnordered<_>>();
1181
1182    while let Some(call_response) = calls.next().await {
1183        match call_response.as_ref() {
1184            Ok(_) => {
1185                response.send(proto::Ack {})?;
1186                return Ok(());
1187            }
1188            Err(_) => {
1189                call_response.trace_err();
1190            }
1191        }
1192    }
1193
1194    {
1195        let room = session
1196            .db()
1197            .await
1198            .call_failed(room_id, called_user_id)
1199            .await?;
1200        room_updated(&room, &session.peer);
1201    }
1202    update_user_contacts(called_user_id, &session).await?;
1203
1204    Err(anyhow!("failed to ring user"))?
1205}
1206
1207async fn cancel_call(
1208    request: proto::CancelCall,
1209    response: Response<proto::CancelCall>,
1210    session: Session,
1211) -> Result<()> {
1212    let called_user_id = UserId::from_proto(request.called_user_id);
1213    let room_id = RoomId::from_proto(request.room_id);
1214    {
1215        let room = session
1216            .db()
1217            .await
1218            .cancel_call(room_id, session.connection_id, called_user_id)
1219            .await?;
1220        room_updated(&room, &session.peer);
1221    }
1222
1223    for connection_id in session
1224        .connection_pool()
1225        .await
1226        .user_connection_ids(called_user_id)
1227    {
1228        session
1229            .peer
1230            .send(
1231                connection_id,
1232                proto::CallCanceled {
1233                    room_id: room_id.to_proto(),
1234                },
1235            )
1236            .trace_err();
1237    }
1238    response.send(proto::Ack {})?;
1239
1240    update_user_contacts(called_user_id, &session).await?;
1241    Ok(())
1242}
1243
1244async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1245    let room_id = RoomId::from_proto(message.room_id);
1246    {
1247        let room = session
1248            .db()
1249            .await
1250            .decline_call(Some(room_id), session.user_id)
1251            .await?
1252            .ok_or_else(|| anyhow!("failed to decline call"))?;
1253        room_updated(&room, &session.peer);
1254    }
1255
1256    for connection_id in session
1257        .connection_pool()
1258        .await
1259        .user_connection_ids(session.user_id)
1260    {
1261        session
1262            .peer
1263            .send(
1264                connection_id,
1265                proto::CallCanceled {
1266                    room_id: room_id.to_proto(),
1267                },
1268            )
1269            .trace_err();
1270    }
1271    update_user_contacts(session.user_id, &session).await?;
1272    Ok(())
1273}
1274
1275async fn update_participant_location(
1276    request: proto::UpdateParticipantLocation,
1277    response: Response<proto::UpdateParticipantLocation>,
1278    session: Session,
1279) -> Result<()> {
1280    let room_id = RoomId::from_proto(request.room_id);
1281    let location = request
1282        .location
1283        .ok_or_else(|| anyhow!("invalid location"))?;
1284    let room = session
1285        .db()
1286        .await
1287        .update_room_participant_location(room_id, session.connection_id, location)
1288        .await?;
1289    room_updated(&room, &session.peer);
1290    response.send(proto::Ack {})?;
1291    Ok(())
1292}
1293
1294async fn share_project(
1295    request: proto::ShareProject,
1296    response: Response<proto::ShareProject>,
1297    session: Session,
1298) -> Result<()> {
1299    let (project_id, room) = &*session
1300        .db()
1301        .await
1302        .share_project(
1303            RoomId::from_proto(request.room_id),
1304            session.connection_id,
1305            &request.worktrees,
1306        )
1307        .await?;
1308    response.send(proto::ShareProjectResponse {
1309        project_id: project_id.to_proto(),
1310    })?;
1311    room_updated(&room, &session.peer);
1312
1313    Ok(())
1314}
1315
1316async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1317    let project_id = ProjectId::from_proto(message.project_id);
1318
1319    let (room, guest_connection_ids) = &*session
1320        .db()
1321        .await
1322        .unshare_project(project_id, session.connection_id)
1323        .await?;
1324
1325    broadcast(
1326        Some(session.connection_id),
1327        guest_connection_ids.iter().copied(),
1328        |conn_id| session.peer.send(conn_id, message.clone()),
1329    );
1330    room_updated(&room, &session.peer);
1331
1332    Ok(())
1333}
1334
1335async fn join_project(
1336    request: proto::JoinProject,
1337    response: Response<proto::JoinProject>,
1338    session: Session,
1339) -> Result<()> {
1340    let project_id = ProjectId::from_proto(request.project_id);
1341    let guest_user_id = session.user_id;
1342
1343    tracing::info!(%project_id, "join project");
1344
1345    let (project, replica_id) = &mut *session
1346        .db()
1347        .await
1348        .join_project(project_id, session.connection_id)
1349        .await?;
1350
1351    let collaborators = project
1352        .collaborators
1353        .iter()
1354        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1355        .map(|collaborator| collaborator.to_proto())
1356        .collect::<Vec<_>>();
1357
1358    let worktrees = project
1359        .worktrees
1360        .iter()
1361        .map(|(id, worktree)| proto::WorktreeMetadata {
1362            id: *id,
1363            root_name: worktree.root_name.clone(),
1364            visible: worktree.visible,
1365            abs_path: worktree.abs_path.clone(),
1366        })
1367        .collect::<Vec<_>>();
1368
1369    for collaborator in &collaborators {
1370        session
1371            .peer
1372            .send(
1373                collaborator.peer_id.unwrap().into(),
1374                proto::AddProjectCollaborator {
1375                    project_id: project_id.to_proto(),
1376                    collaborator: Some(proto::Collaborator {
1377                        peer_id: Some(session.connection_id.into()),
1378                        replica_id: replica_id.0 as u32,
1379                        user_id: guest_user_id.to_proto(),
1380                    }),
1381                },
1382            )
1383            .trace_err();
1384    }
1385
1386    // First, we send the metadata associated with each worktree.
1387    response.send(proto::JoinProjectResponse {
1388        worktrees: worktrees.clone(),
1389        replica_id: replica_id.0 as u32,
1390        collaborators: collaborators.clone(),
1391        language_servers: project.language_servers.clone(),
1392    })?;
1393
1394    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1395        #[cfg(any(test, feature = "test-support"))]
1396        const MAX_CHUNK_SIZE: usize = 2;
1397        #[cfg(not(any(test, feature = "test-support")))]
1398        const MAX_CHUNK_SIZE: usize = 256;
1399
1400        // Stream this worktree's entries.
1401        let message = proto::UpdateWorktree {
1402            project_id: project_id.to_proto(),
1403            worktree_id,
1404            abs_path: worktree.abs_path.clone(),
1405            root_name: worktree.root_name,
1406            updated_entries: worktree.entries,
1407            removed_entries: Default::default(),
1408            scan_id: worktree.scan_id,
1409            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1410            updated_repositories: worktree.repository_entries.into_values().collect(),
1411            removed_repositories: Default::default(),
1412        };
1413        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1414            session.peer.send(session.connection_id, update.clone())?;
1415        }
1416
1417        // Stream this worktree's diagnostics.
1418        for summary in worktree.diagnostic_summaries {
1419            session.peer.send(
1420                session.connection_id,
1421                proto::UpdateDiagnosticSummary {
1422                    project_id: project_id.to_proto(),
1423                    worktree_id: worktree.id,
1424                    summary: Some(summary),
1425                },
1426            )?;
1427        }
1428
1429        for settings_file in worktree.settings_files {
1430            session.peer.send(
1431                session.connection_id,
1432                proto::UpdateWorktreeSettings {
1433                    project_id: project_id.to_proto(),
1434                    worktree_id: worktree.id,
1435                    path: settings_file.path,
1436                    content: Some(settings_file.content),
1437                },
1438            )?;
1439        }
1440    }
1441
1442    for language_server in &project.language_servers {
1443        session.peer.send(
1444            session.connection_id,
1445            proto::UpdateLanguageServer {
1446                project_id: project_id.to_proto(),
1447                language_server_id: language_server.id,
1448                variant: Some(
1449                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1450                        proto::LspDiskBasedDiagnosticsUpdated {},
1451                    ),
1452                ),
1453            },
1454        )?;
1455    }
1456
1457    Ok(())
1458}
1459
1460async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1461    let sender_id = session.connection_id;
1462    let project_id = ProjectId::from_proto(request.project_id);
1463
1464    let (room, project) = &*session
1465        .db()
1466        .await
1467        .leave_project(project_id, sender_id)
1468        .await?;
1469    tracing::info!(
1470        %project_id,
1471        host_user_id = %project.host_user_id,
1472        host_connection_id = %project.host_connection_id,
1473        "leave project"
1474    );
1475
1476    project_left(&project, &session);
1477    room_updated(&room, &session.peer);
1478
1479    Ok(())
1480}
1481
1482async fn update_project(
1483    request: proto::UpdateProject,
1484    response: Response<proto::UpdateProject>,
1485    session: Session,
1486) -> Result<()> {
1487    let project_id = ProjectId::from_proto(request.project_id);
1488    let (room, guest_connection_ids) = &*session
1489        .db()
1490        .await
1491        .update_project(project_id, session.connection_id, &request.worktrees)
1492        .await?;
1493    broadcast(
1494        Some(session.connection_id),
1495        guest_connection_ids.iter().copied(),
1496        |connection_id| {
1497            session
1498                .peer
1499                .forward_send(session.connection_id, connection_id, request.clone())
1500        },
1501    );
1502    room_updated(&room, &session.peer);
1503    response.send(proto::Ack {})?;
1504
1505    Ok(())
1506}
1507
1508async fn update_worktree(
1509    request: proto::UpdateWorktree,
1510    response: Response<proto::UpdateWorktree>,
1511    session: Session,
1512) -> Result<()> {
1513    let guest_connection_ids = session
1514        .db()
1515        .await
1516        .update_worktree(&request, session.connection_id)
1517        .await?;
1518
1519    broadcast(
1520        Some(session.connection_id),
1521        guest_connection_ids.iter().copied(),
1522        |connection_id| {
1523            session
1524                .peer
1525                .forward_send(session.connection_id, connection_id, request.clone())
1526        },
1527    );
1528    response.send(proto::Ack {})?;
1529    Ok(())
1530}
1531
1532async fn update_diagnostic_summary(
1533    message: proto::UpdateDiagnosticSummary,
1534    session: Session,
1535) -> Result<()> {
1536    let guest_connection_ids = session
1537        .db()
1538        .await
1539        .update_diagnostic_summary(&message, session.connection_id)
1540        .await?;
1541
1542    broadcast(
1543        Some(session.connection_id),
1544        guest_connection_ids.iter().copied(),
1545        |connection_id| {
1546            session
1547                .peer
1548                .forward_send(session.connection_id, connection_id, message.clone())
1549        },
1550    );
1551
1552    Ok(())
1553}
1554
1555async fn update_worktree_settings(
1556    message: proto::UpdateWorktreeSettings,
1557    session: Session,
1558) -> Result<()> {
1559    let guest_connection_ids = session
1560        .db()
1561        .await
1562        .update_worktree_settings(&message, session.connection_id)
1563        .await?;
1564
1565    broadcast(
1566        Some(session.connection_id),
1567        guest_connection_ids.iter().copied(),
1568        |connection_id| {
1569            session
1570                .peer
1571                .forward_send(session.connection_id, connection_id, message.clone())
1572        },
1573    );
1574
1575    Ok(())
1576}
1577
1578async fn start_language_server(
1579    request: proto::StartLanguageServer,
1580    session: Session,
1581) -> Result<()> {
1582    let guest_connection_ids = session
1583        .db()
1584        .await
1585        .start_language_server(&request, session.connection_id)
1586        .await?;
1587
1588    broadcast(
1589        Some(session.connection_id),
1590        guest_connection_ids.iter().copied(),
1591        |connection_id| {
1592            session
1593                .peer
1594                .forward_send(session.connection_id, connection_id, request.clone())
1595        },
1596    );
1597    Ok(())
1598}
1599
1600async fn update_language_server(
1601    request: proto::UpdateLanguageServer,
1602    session: Session,
1603) -> Result<()> {
1604    session.executor.record_backtrace();
1605    let project_id = ProjectId::from_proto(request.project_id);
1606    let project_connection_ids = session
1607        .db()
1608        .await
1609        .project_connection_ids(project_id, session.connection_id)
1610        .await?;
1611    broadcast(
1612        Some(session.connection_id),
1613        project_connection_ids.iter().copied(),
1614        |connection_id| {
1615            session
1616                .peer
1617                .forward_send(session.connection_id, connection_id, request.clone())
1618        },
1619    );
1620    Ok(())
1621}
1622
1623async fn forward_project_request<T>(
1624    request: T,
1625    response: Response<T>,
1626    session: Session,
1627) -> Result<()>
1628where
1629    T: EntityMessage + RequestMessage,
1630{
1631    session.executor.record_backtrace();
1632    let project_id = ProjectId::from_proto(request.remote_entity_id());
1633    let host_connection_id = {
1634        let collaborators = session
1635            .db()
1636            .await
1637            .project_collaborators(project_id, session.connection_id)
1638            .await?;
1639        collaborators
1640            .iter()
1641            .find(|collaborator| collaborator.is_host)
1642            .ok_or_else(|| anyhow!("host not found"))?
1643            .connection_id
1644    };
1645
1646    let payload = session
1647        .peer
1648        .forward_request(session.connection_id, host_connection_id, request)
1649        .await?;
1650
1651    response.send(payload)?;
1652    Ok(())
1653}
1654
1655async fn create_buffer_for_peer(
1656    request: proto::CreateBufferForPeer,
1657    session: Session,
1658) -> Result<()> {
1659    session.executor.record_backtrace();
1660    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1661    session
1662        .peer
1663        .forward_send(session.connection_id, peer_id.into(), request)?;
1664    Ok(())
1665}
1666
1667async fn update_buffer(
1668    request: proto::UpdateBuffer,
1669    response: Response<proto::UpdateBuffer>,
1670    session: Session,
1671) -> Result<()> {
1672    session.executor.record_backtrace();
1673    let project_id = ProjectId::from_proto(request.project_id);
1674    let mut guest_connection_ids;
1675    let mut host_connection_id = None;
1676    {
1677        let collaborators = session
1678            .db()
1679            .await
1680            .project_collaborators(project_id, session.connection_id)
1681            .await?;
1682        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1683        for collaborator in collaborators.iter() {
1684            if collaborator.is_host {
1685                host_connection_id = Some(collaborator.connection_id);
1686            } else {
1687                guest_connection_ids.push(collaborator.connection_id);
1688            }
1689        }
1690    }
1691    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1692
1693    session.executor.record_backtrace();
1694    broadcast(
1695        Some(session.connection_id),
1696        guest_connection_ids,
1697        |connection_id| {
1698            session
1699                .peer
1700                .forward_send(session.connection_id, connection_id, request.clone())
1701        },
1702    );
1703    if host_connection_id != session.connection_id {
1704        session
1705            .peer
1706            .forward_request(session.connection_id, host_connection_id, request.clone())
1707            .await?;
1708    }
1709
1710    response.send(proto::Ack {})?;
1711    Ok(())
1712}
1713
1714async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1715    let project_id = ProjectId::from_proto(request.project_id);
1716    let project_connection_ids = session
1717        .db()
1718        .await
1719        .project_connection_ids(project_id, session.connection_id)
1720        .await?;
1721
1722    broadcast(
1723        Some(session.connection_id),
1724        project_connection_ids.iter().copied(),
1725        |connection_id| {
1726            session
1727                .peer
1728                .forward_send(session.connection_id, connection_id, request.clone())
1729        },
1730    );
1731    Ok(())
1732}
1733
1734async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1735    let project_id = ProjectId::from_proto(request.project_id);
1736    let project_connection_ids = session
1737        .db()
1738        .await
1739        .project_connection_ids(project_id, session.connection_id)
1740        .await?;
1741    broadcast(
1742        Some(session.connection_id),
1743        project_connection_ids.iter().copied(),
1744        |connection_id| {
1745            session
1746                .peer
1747                .forward_send(session.connection_id, connection_id, request.clone())
1748        },
1749    );
1750    Ok(())
1751}
1752
1753async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1754    let project_id = ProjectId::from_proto(request.project_id);
1755    let project_connection_ids = session
1756        .db()
1757        .await
1758        .project_connection_ids(project_id, session.connection_id)
1759        .await?;
1760    broadcast(
1761        Some(session.connection_id),
1762        project_connection_ids.iter().copied(),
1763        |connection_id| {
1764            session
1765                .peer
1766                .forward_send(session.connection_id, connection_id, request.clone())
1767        },
1768    );
1769    Ok(())
1770}
1771
1772async fn follow(
1773    request: proto::Follow,
1774    response: Response<proto::Follow>,
1775    session: Session,
1776) -> Result<()> {
1777    let project_id = ProjectId::from_proto(request.project_id);
1778    let leader_id = request
1779        .leader_id
1780        .ok_or_else(|| anyhow!("invalid leader id"))?
1781        .into();
1782    let follower_id = session.connection_id;
1783
1784    {
1785        let project_connection_ids = session
1786            .db()
1787            .await
1788            .project_connection_ids(project_id, session.connection_id)
1789            .await?;
1790
1791        if !project_connection_ids.contains(&leader_id) {
1792            Err(anyhow!("no such peer"))?;
1793        }
1794    }
1795
1796    let mut response_payload = session
1797        .peer
1798        .forward_request(session.connection_id, leader_id, request)
1799        .await?;
1800    response_payload
1801        .views
1802        .retain(|view| view.leader_id != Some(follower_id.into()));
1803    response.send(response_payload)?;
1804
1805    let room = session
1806        .db()
1807        .await
1808        .follow(project_id, leader_id, follower_id)
1809        .await?;
1810    room_updated(&room, &session.peer);
1811
1812    Ok(())
1813}
1814
1815async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1816    let project_id = ProjectId::from_proto(request.project_id);
1817    let leader_id = request
1818        .leader_id
1819        .ok_or_else(|| anyhow!("invalid leader id"))?
1820        .into();
1821    let follower_id = session.connection_id;
1822
1823    if !session
1824        .db()
1825        .await
1826        .project_connection_ids(project_id, session.connection_id)
1827        .await?
1828        .contains(&leader_id)
1829    {
1830        Err(anyhow!("no such peer"))?;
1831    }
1832
1833    session
1834        .peer
1835        .forward_send(session.connection_id, leader_id, request)?;
1836
1837    let room = session
1838        .db()
1839        .await
1840        .unfollow(project_id, leader_id, follower_id)
1841        .await?;
1842    room_updated(&room, &session.peer);
1843
1844    Ok(())
1845}
1846
1847async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1848    let project_id = ProjectId::from_proto(request.project_id);
1849    let project_connection_ids = session
1850        .db
1851        .lock()
1852        .await
1853        .project_connection_ids(project_id, session.connection_id)
1854        .await?;
1855
1856    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1857        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1858        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1859        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1860    });
1861    for follower_peer_id in request.follower_ids.iter().copied() {
1862        let follower_connection_id = follower_peer_id.into();
1863        if project_connection_ids.contains(&follower_connection_id)
1864            && Some(follower_peer_id) != leader_id
1865        {
1866            session.peer.forward_send(
1867                session.connection_id,
1868                follower_connection_id,
1869                request.clone(),
1870            )?;
1871        }
1872    }
1873    Ok(())
1874}
1875
1876async fn get_users(
1877    request: proto::GetUsers,
1878    response: Response<proto::GetUsers>,
1879    session: Session,
1880) -> Result<()> {
1881    let user_ids = request
1882        .user_ids
1883        .into_iter()
1884        .map(UserId::from_proto)
1885        .collect();
1886    let users = session
1887        .db()
1888        .await
1889        .get_users_by_ids(user_ids)
1890        .await?
1891        .into_iter()
1892        .map(|user| proto::User {
1893            id: user.id.to_proto(),
1894            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1895            github_login: user.github_login,
1896        })
1897        .collect();
1898    response.send(proto::UsersResponse { users })?;
1899    Ok(())
1900}
1901
1902async fn fuzzy_search_users(
1903    request: proto::FuzzySearchUsers,
1904    response: Response<proto::FuzzySearchUsers>,
1905    session: Session,
1906) -> Result<()> {
1907    let query = request.query;
1908    let users = match query.len() {
1909        0 => vec![],
1910        1 | 2 => session
1911            .db()
1912            .await
1913            .get_user_by_github_login(&query)
1914            .await?
1915            .into_iter()
1916            .collect(),
1917        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1918    };
1919    let users = users
1920        .into_iter()
1921        .filter(|user| user.id != session.user_id)
1922        .map(|user| proto::User {
1923            id: user.id.to_proto(),
1924            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1925            github_login: user.github_login,
1926        })
1927        .collect();
1928    response.send(proto::UsersResponse { users })?;
1929    Ok(())
1930}
1931
1932async fn request_contact(
1933    request: proto::RequestContact,
1934    response: Response<proto::RequestContact>,
1935    session: Session,
1936) -> Result<()> {
1937    let requester_id = session.user_id;
1938    let responder_id = UserId::from_proto(request.responder_id);
1939    if requester_id == responder_id {
1940        return Err(anyhow!("cannot add yourself as a contact"))?;
1941    }
1942
1943    session
1944        .db()
1945        .await
1946        .send_contact_request(requester_id, responder_id)
1947        .await?;
1948
1949    // Update outgoing contact requests of requester
1950    let mut update = proto::UpdateContacts::default();
1951    update.outgoing_requests.push(responder_id.to_proto());
1952    for connection_id in session
1953        .connection_pool()
1954        .await
1955        .user_connection_ids(requester_id)
1956    {
1957        session.peer.send(connection_id, update.clone())?;
1958    }
1959
1960    // Update incoming contact requests of responder
1961    let mut update = proto::UpdateContacts::default();
1962    update
1963        .incoming_requests
1964        .push(proto::IncomingContactRequest {
1965            requester_id: requester_id.to_proto(),
1966            should_notify: true,
1967        });
1968    for connection_id in session
1969        .connection_pool()
1970        .await
1971        .user_connection_ids(responder_id)
1972    {
1973        session.peer.send(connection_id, update.clone())?;
1974    }
1975
1976    response.send(proto::Ack {})?;
1977    Ok(())
1978}
1979
1980async fn respond_to_contact_request(
1981    request: proto::RespondToContactRequest,
1982    response: Response<proto::RespondToContactRequest>,
1983    session: Session,
1984) -> Result<()> {
1985    let responder_id = session.user_id;
1986    let requester_id = UserId::from_proto(request.requester_id);
1987    let db = session.db().await;
1988    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1989        db.dismiss_contact_notification(responder_id, requester_id)
1990            .await?;
1991    } else {
1992        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1993
1994        db.respond_to_contact_request(responder_id, requester_id, accept)
1995            .await?;
1996        let requester_busy = db.is_user_busy(requester_id).await?;
1997        let responder_busy = db.is_user_busy(responder_id).await?;
1998
1999        let pool = session.connection_pool().await;
2000        // Update responder with new contact
2001        let mut update = proto::UpdateContacts::default();
2002        if accept {
2003            update
2004                .contacts
2005                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2006        }
2007        update
2008            .remove_incoming_requests
2009            .push(requester_id.to_proto());
2010        for connection_id in pool.user_connection_ids(responder_id) {
2011            session.peer.send(connection_id, update.clone())?;
2012        }
2013
2014        // Update requester with new contact
2015        let mut update = proto::UpdateContacts::default();
2016        if accept {
2017            update
2018                .contacts
2019                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2020        }
2021        update
2022            .remove_outgoing_requests
2023            .push(responder_id.to_proto());
2024        for connection_id in pool.user_connection_ids(requester_id) {
2025            session.peer.send(connection_id, update.clone())?;
2026        }
2027    }
2028
2029    response.send(proto::Ack {})?;
2030    Ok(())
2031}
2032
2033async fn remove_contact(
2034    request: proto::RemoveContact,
2035    response: Response<proto::RemoveContact>,
2036    session: Session,
2037) -> Result<()> {
2038    let requester_id = session.user_id;
2039    let responder_id = UserId::from_proto(request.user_id);
2040    let db = session.db().await;
2041    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2042
2043    let pool = session.connection_pool().await;
2044    // Update outgoing contact requests of requester
2045    let mut update = proto::UpdateContacts::default();
2046    if contact_accepted {
2047        update.remove_contacts.push(responder_id.to_proto());
2048    } else {
2049        update
2050            .remove_outgoing_requests
2051            .push(responder_id.to_proto());
2052    }
2053    for connection_id in pool.user_connection_ids(requester_id) {
2054        session.peer.send(connection_id, update.clone())?;
2055    }
2056
2057    // Update incoming contact requests of responder
2058    let mut update = proto::UpdateContacts::default();
2059    if contact_accepted {
2060        update.remove_contacts.push(requester_id.to_proto());
2061    } else {
2062        update
2063            .remove_incoming_requests
2064            .push(requester_id.to_proto());
2065    }
2066    for connection_id in pool.user_connection_ids(responder_id) {
2067        session.peer.send(connection_id, update.clone())?;
2068    }
2069
2070    response.send(proto::Ack {})?;
2071    Ok(())
2072}
2073
2074async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2075    let project_id = ProjectId::from_proto(request.project_id);
2076    let project_connection_ids = session
2077        .db()
2078        .await
2079        .project_connection_ids(project_id, session.connection_id)
2080        .await?;
2081    broadcast(
2082        Some(session.connection_id),
2083        project_connection_ids.iter().copied(),
2084        |connection_id| {
2085            session
2086                .peer
2087                .forward_send(session.connection_id, connection_id, request.clone())
2088        },
2089    );
2090    Ok(())
2091}
2092
2093async fn get_private_user_info(
2094    _request: proto::GetPrivateUserInfo,
2095    response: Response<proto::GetPrivateUserInfo>,
2096    session: Session,
2097) -> Result<()> {
2098    let metrics_id = session
2099        .db()
2100        .await
2101        .get_user_metrics_id(session.user_id)
2102        .await?;
2103    let user = session
2104        .db()
2105        .await
2106        .get_user_by_id(session.user_id)
2107        .await?
2108        .ok_or_else(|| anyhow!("user not found"))?;
2109    response.send(proto::GetPrivateUserInfoResponse {
2110        metrics_id,
2111        staff: user.admin,
2112    })?;
2113    Ok(())
2114}
2115
2116fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2117    match message {
2118        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2119        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2120        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2121        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2122        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2123            code: frame.code.into(),
2124            reason: frame.reason,
2125        })),
2126    }
2127}
2128
2129fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2130    match message {
2131        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2132        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2133        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2134        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2135        AxumMessage::Close(frame) => {
2136            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2137                code: frame.code.into(),
2138                reason: frame.reason,
2139            }))
2140        }
2141    }
2142}
2143
2144fn build_initial_contacts_update(
2145    contacts: Vec<db::Contact>,
2146    pool: &ConnectionPool,
2147) -> proto::UpdateContacts {
2148    let mut update = proto::UpdateContacts::default();
2149
2150    for contact in contacts {
2151        match contact {
2152            db::Contact::Accepted {
2153                user_id,
2154                should_notify,
2155                busy,
2156            } => {
2157                update
2158                    .contacts
2159                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2160            }
2161            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2162            db::Contact::Incoming {
2163                user_id,
2164                should_notify,
2165            } => update
2166                .incoming_requests
2167                .push(proto::IncomingContactRequest {
2168                    requester_id: user_id.to_proto(),
2169                    should_notify,
2170                }),
2171        }
2172    }
2173
2174    update
2175}
2176
2177fn contact_for_user(
2178    user_id: UserId,
2179    should_notify: bool,
2180    busy: bool,
2181    pool: &ConnectionPool,
2182) -> proto::Contact {
2183    proto::Contact {
2184        user_id: user_id.to_proto(),
2185        online: pool.is_user_online(user_id),
2186        busy,
2187        should_notify,
2188    }
2189}
2190
2191fn room_updated(room: &proto::Room, peer: &Peer) {
2192    broadcast(
2193        None,
2194        room.participants
2195            .iter()
2196            .filter_map(|participant| Some(participant.peer_id?.into())),
2197        |peer_id| {
2198            peer.send(
2199                peer_id.into(),
2200                proto::RoomUpdated {
2201                    room: Some(room.clone()),
2202                },
2203            )
2204        },
2205    );
2206}
2207
2208async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2209    let db = session.db().await;
2210    let contacts = db.get_contacts(user_id).await?;
2211    let busy = db.is_user_busy(user_id).await?;
2212
2213    let pool = session.connection_pool().await;
2214    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2215    for contact in contacts {
2216        if let db::Contact::Accepted {
2217            user_id: contact_user_id,
2218            ..
2219        } = contact
2220        {
2221            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2222                session
2223                    .peer
2224                    .send(
2225                        contact_conn_id,
2226                        proto::UpdateContacts {
2227                            contacts: vec![updated_contact.clone()],
2228                            remove_contacts: Default::default(),
2229                            incoming_requests: Default::default(),
2230                            remove_incoming_requests: Default::default(),
2231                            outgoing_requests: Default::default(),
2232                            remove_outgoing_requests: Default::default(),
2233                        },
2234                    )
2235                    .trace_err();
2236            }
2237        }
2238    }
2239    Ok(())
2240}
2241
2242async fn leave_room_for_session(session: &Session) -> Result<()> {
2243    let mut contacts_to_update = HashSet::default();
2244
2245    let room_id;
2246    let canceled_calls_to_user_ids;
2247    let live_kit_room;
2248    let delete_live_kit_room;
2249    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2250        contacts_to_update.insert(session.user_id);
2251
2252        for project in left_room.left_projects.values() {
2253            project_left(project, session);
2254        }
2255
2256        room_updated(&left_room.room, &session.peer);
2257        room_id = RoomId::from_proto(left_room.room.id);
2258        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2259        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2260        delete_live_kit_room = left_room.room.participants.is_empty();
2261    } else {
2262        return Ok(());
2263    }
2264
2265    {
2266        let pool = session.connection_pool().await;
2267        for canceled_user_id in canceled_calls_to_user_ids {
2268            for connection_id in pool.user_connection_ids(canceled_user_id) {
2269                session
2270                    .peer
2271                    .send(
2272                        connection_id,
2273                        proto::CallCanceled {
2274                            room_id: room_id.to_proto(),
2275                        },
2276                    )
2277                    .trace_err();
2278            }
2279            contacts_to_update.insert(canceled_user_id);
2280        }
2281    }
2282
2283    for contact_user_id in contacts_to_update {
2284        update_user_contacts(contact_user_id, &session).await?;
2285    }
2286
2287    if let Some(live_kit) = session.live_kit_client.as_ref() {
2288        live_kit
2289            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2290            .await
2291            .trace_err();
2292
2293        if delete_live_kit_room {
2294            live_kit.delete_room(live_kit_room).await.trace_err();
2295        }
2296    }
2297
2298    Ok(())
2299}
2300
2301fn project_left(project: &db::LeftProject, session: &Session) {
2302    for connection_id in &project.connection_ids {
2303        if project.host_user_id == session.user_id {
2304            session
2305                .peer
2306                .send(
2307                    *connection_id,
2308                    proto::UnshareProject {
2309                        project_id: project.id.to_proto(),
2310                    },
2311                )
2312                .trace_err();
2313        } else {
2314            session
2315                .peer
2316                .send(
2317                    *connection_id,
2318                    proto::RemoveProjectCollaborator {
2319                        project_id: project.id.to_proto(),
2320                        peer_id: Some(session.connection_id.into()),
2321                    },
2322                )
2323                .trace_err();
2324        }
2325    }
2326}
2327
2328pub trait ResultExt {
2329    type Ok;
2330
2331    fn trace_err(self) -> Option<Self::Ok>;
2332}
2333
2334impl<T, E> ResultExt for Result<T, E>
2335where
2336    E: std::fmt::Debug,
2337{
2338    type Ok = T;
2339
2340    fn trace_err(self) -> Option<T> {
2341        match self {
2342            Ok(value) => Some(value),
2343            Err(error) => {
2344                tracing::error!("{:?}", error);
2345                None
2346            }
2347        }
2348    }
2349}