rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::{Duration, Instant},
  55};
  56use tokio::sync::{watch, Semaphore};
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_request_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_message_handler(update_worktree_settings)
 204            .add_message_handler(refresh_inlay_hints)
 205            .add_request_handler(forward_project_request::<proto::GetHover>)
 206            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 207            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 208            .add_request_handler(forward_project_request::<proto::GetReferences>)
 209            .add_request_handler(forward_project_request::<proto::SearchProject>)
 210            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 211            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 213            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 214            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 215            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 216            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 217            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 218            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 219            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 220            .add_request_handler(forward_project_request::<proto::PerformRename>)
 221            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 222            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 223            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 224            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 226            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 227            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 228            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 229            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 230            .add_request_handler(forward_project_request::<proto::InlayHints>)
 231            .add_message_handler(create_buffer_for_peer)
 232            .add_request_handler(update_buffer)
 233            .add_message_handler(update_buffer_file)
 234            .add_message_handler(buffer_reloaded)
 235            .add_message_handler(buffer_saved)
 236            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 237            .add_request_handler(get_users)
 238            .add_request_handler(fuzzy_search_users)
 239            .add_request_handler(request_contact)
 240            .add_request_handler(remove_contact)
 241            .add_request_handler(respond_to_contact_request)
 242            .add_request_handler(follow)
 243            .add_message_handler(unfollow)
 244            .add_message_handler(update_followers)
 245            .add_message_handler(update_diff_base)
 246            .add_request_handler(get_private_user_info);
 247
 248        Arc::new(server)
 249    }
 250
 251    pub async fn start(&self) -> Result<()> {
 252        let server_id = *self.id.lock();
 253        let app_state = self.app_state.clone();
 254        let peer = self.peer.clone();
 255        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 256        let pool = self.connection_pool.clone();
 257        let live_kit_client = self.app_state.live_kit_client.clone();
 258
 259        let span = info_span!("start server");
 260        self.executor.spawn_detached(
 261            async move {
 262                tracing::info!("waiting for cleanup timeout");
 263                timeout.await;
 264                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 265                if let Some(room_ids) = app_state
 266                    .db
 267                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 268                    .await
 269                    .trace_err()
 270                {
 271                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 272                    for room_id in room_ids {
 273                        let mut contacts_to_update = HashSet::default();
 274                        let mut canceled_calls_to_user_ids = Vec::new();
 275                        let mut live_kit_room = String::new();
 276                        let mut delete_live_kit_room = false;
 277
 278                        if let Some(mut refreshed_room) = app_state
 279                            .db
 280                            .refresh_room(room_id, server_id)
 281                            .await
 282                            .trace_err()
 283                        {
 284                            tracing::info!(
 285                                room_id = room_id.0,
 286                                new_participant_count = refreshed_room.room.participants.len(),
 287                                "refreshed room"
 288                            );
 289                            room_updated(&refreshed_room.room, &peer);
 290                            contacts_to_update
 291                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 292                            contacts_to_update
 293                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 294                            canceled_calls_to_user_ids =
 295                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 296                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 297                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 298                        }
 299
 300                        {
 301                            let pool = pool.lock();
 302                            for canceled_user_id in canceled_calls_to_user_ids {
 303                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 304                                    peer.send(
 305                                        connection_id,
 306                                        proto::CallCanceled {
 307                                            room_id: room_id.to_proto(),
 308                                        },
 309                                    )
 310                                    .trace_err();
 311                                }
 312                            }
 313                        }
 314
 315                        for user_id in contacts_to_update {
 316                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 317                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 318                            if let Some((busy, contacts)) = busy.zip(contacts) {
 319                                let pool = pool.lock();
 320                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 321                                for contact in contacts {
 322                                    if let db::Contact::Accepted {
 323                                        user_id: contact_user_id,
 324                                        ..
 325                                    } = contact
 326                                    {
 327                                        for contact_conn_id in
 328                                            pool.user_connection_ids(contact_user_id)
 329                                        {
 330                                            peer.send(
 331                                                contact_conn_id,
 332                                                proto::UpdateContacts {
 333                                                    contacts: vec![updated_contact.clone()],
 334                                                    remove_contacts: Default::default(),
 335                                                    incoming_requests: Default::default(),
 336                                                    remove_incoming_requests: Default::default(),
 337                                                    outgoing_requests: Default::default(),
 338                                                    remove_outgoing_requests: Default::default(),
 339                                                },
 340                                            )
 341                                            .trace_err();
 342                                        }
 343                                    }
 344                                }
 345                            }
 346                        }
 347
 348                        if let Some(live_kit) = live_kit_client.as_ref() {
 349                            if delete_live_kit_room {
 350                                live_kit.delete_room(live_kit_room).await.trace_err();
 351                            }
 352                        }
 353                    }
 354                }
 355
 356                app_state
 357                    .db
 358                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 359                    .await
 360                    .trace_err();
 361            }
 362            .instrument(span),
 363        );
 364        Ok(())
 365    }
 366
 367    pub fn teardown(&self) {
 368        self.peer.teardown();
 369        self.connection_pool.lock().reset();
 370        let _ = self.teardown.send(());
 371    }
 372
 373    #[cfg(test)]
 374    pub fn reset(&self, id: ServerId) {
 375        self.teardown();
 376        *self.id.lock() = id;
 377        self.peer.reset(id.0 as u32);
 378    }
 379
 380    #[cfg(test)]
 381    pub fn id(&self) -> ServerId {
 382        *self.id.lock()
 383    }
 384
 385    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 386    where
 387        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 388        Fut: 'static + Send + Future<Output = Result<()>>,
 389        M: EnvelopedMessage,
 390    {
 391        let prev_handler = self.handlers.insert(
 392            TypeId::of::<M>(),
 393            Box::new(move |envelope, session| {
 394                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 395                let span = info_span!(
 396                    "handle message",
 397                    payload_type = envelope.payload_type_name()
 398                );
 399                span.in_scope(|| {
 400                    tracing::info!(
 401                        payload_type = envelope.payload_type_name(),
 402                        "message received"
 403                    );
 404                });
 405                let start_time = Instant::now();
 406                let future = (handler)(*envelope, session);
 407                async move {
 408                    let result = future.await;
 409                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 410                    match result {
 411                        Err(error) => {
 412                            tracing::error!(%error, ?duration_ms, "error handling message")
 413                        }
 414                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 415                    }
 416                }
 417                .instrument(span)
 418                .boxed()
 419            }),
 420        );
 421        if prev_handler.is_some() {
 422            panic!("registered a handler for the same message twice");
 423        }
 424        self
 425    }
 426
 427    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 428    where
 429        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 430        Fut: 'static + Send + Future<Output = Result<()>>,
 431        M: EnvelopedMessage,
 432    {
 433        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 434        self
 435    }
 436
 437    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 438    where
 439        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 440        Fut: Send + Future<Output = Result<()>>,
 441        M: RequestMessage,
 442    {
 443        let handler = Arc::new(handler);
 444        self.add_handler(move |envelope, session| {
 445            let receipt = envelope.receipt();
 446            let handler = handler.clone();
 447            async move {
 448                let peer = session.peer.clone();
 449                let responded = Arc::new(AtomicBool::default());
 450                let response = Response {
 451                    peer: peer.clone(),
 452                    responded: responded.clone(),
 453                    receipt,
 454                };
 455                match (handler)(envelope.payload, response, session).await {
 456                    Ok(()) => {
 457                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 458                            Ok(())
 459                        } else {
 460                            Err(anyhow!("handler did not send a response"))?
 461                        }
 462                    }
 463                    Err(error) => {
 464                        peer.respond_with_error(
 465                            receipt,
 466                            proto::Error {
 467                                message: error.to_string(),
 468                            },
 469                        )?;
 470                        Err(error)
 471                    }
 472                }
 473            }
 474        })
 475    }
 476
 477    pub fn handle_connection(
 478        self: &Arc<Self>,
 479        connection: Connection,
 480        address: String,
 481        user: User,
 482        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 483        executor: Executor,
 484    ) -> impl Future<Output = Result<()>> {
 485        let this = self.clone();
 486        let user_id = user.id;
 487        let login = user.github_login;
 488        let span = info_span!("handle connection", %user_id, %login, %address);
 489        let mut teardown = self.teardown.subscribe();
 490        async move {
 491            let (connection_id, handle_io, mut incoming_rx) = this
 492                .peer
 493                .add_connection(connection, {
 494                    let executor = executor.clone();
 495                    move |duration| executor.sleep(duration)
 496                });
 497
 498            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 499            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 500            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 501
 502            if let Some(send_connection_id) = send_connection_id.take() {
 503                let _ = send_connection_id.send(connection_id);
 504            }
 505
 506            if !user.connected_once {
 507                this.peer.send(connection_id, proto::ShowContacts {})?;
 508                this.app_state.db.set_user_connected_once(user_id, true).await?;
 509            }
 510
 511            let (contacts, invite_code) = future::try_join(
 512                this.app_state.db.get_contacts(user_id),
 513                this.app_state.db.get_invite_code_for_user(user_id)
 514            ).await?;
 515
 516            {
 517                let mut pool = this.connection_pool.lock();
 518                pool.add_connection(connection_id, user_id, user.admin);
 519                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 520
 521                if let Some((code, count)) = invite_code {
 522                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 523                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 524                        count: count as u32,
 525                    })?;
 526                }
 527            }
 528
 529            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 530                this.peer.send(connection_id, incoming_call)?;
 531            }
 532
 533            let session = Session {
 534                user_id,
 535                connection_id,
 536                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 537                peer: this.peer.clone(),
 538                connection_pool: this.connection_pool.clone(),
 539                live_kit_client: this.app_state.live_kit_client.clone(),
 540                executor: executor.clone(),
 541            };
 542            update_user_contacts(user_id, &session).await?;
 543
 544            let handle_io = handle_io.fuse();
 545            futures::pin_mut!(handle_io);
 546
 547            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 548            // This prevents deadlocks when e.g., client A performs a request to client B and
 549            // client B performs a request to client A. If both clients stop processing further
 550            // messages until their respective request completes, they won't have a chance to
 551            // respond to the other client's request and cause a deadlock.
 552            //
 553            // This arrangement ensures we will attempt to process earlier messages first, but fall
 554            // back to processing messages arrived later in the spirit of making progress.
 555            let mut foreground_message_handlers = FuturesUnordered::new();
 556            let concurrent_handlers = Arc::new(Semaphore::new(256));
 557            loop {
 558                let next_message = async {
 559                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 560                    let message = incoming_rx.next().await;
 561                    (permit, message)
 562                }.fuse();
 563                futures::pin_mut!(next_message);
 564                futures::select_biased! {
 565                    _ = teardown.changed().fuse() => return Ok(()),
 566                    result = handle_io => {
 567                        if let Err(error) = result {
 568                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 569                        }
 570                        break;
 571                    }
 572                    _ = foreground_message_handlers.next() => {}
 573                    next_message = next_message => {
 574                        let (permit, message) = next_message;
 575                        if let Some(message) = message {
 576                            let type_name = message.payload_type_name();
 577                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 578                            let span_enter = span.enter();
 579                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 580                                let is_background = message.is_background();
 581                                let handle_message = (handler)(message, session.clone());
 582                                drop(span_enter);
 583
 584                                let handle_message = async move {
 585                                    handle_message.await;
 586                                    drop(permit);
 587                                }.instrument(span);
 588                                if is_background {
 589                                    executor.spawn_detached(handle_message);
 590                                } else {
 591                                    foreground_message_handlers.push(handle_message);
 592                                }
 593                            } else {
 594                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 595                            }
 596                        } else {
 597                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 598                            break;
 599                        }
 600                    }
 601                }
 602            }
 603
 604            drop(foreground_message_handlers);
 605            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 606            if let Err(error) = connection_lost(session, teardown, executor).await {
 607                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 608            }
 609
 610            Ok(())
 611        }.instrument(span)
 612    }
 613
 614    pub async fn invite_code_redeemed(
 615        self: &Arc<Self>,
 616        inviter_id: UserId,
 617        invitee_id: UserId,
 618    ) -> Result<()> {
 619        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 620            if let Some(code) = &user.invite_code {
 621                let pool = self.connection_pool.lock();
 622                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 623                for connection_id in pool.user_connection_ids(inviter_id) {
 624                    self.peer.send(
 625                        connection_id,
 626                        proto::UpdateContacts {
 627                            contacts: vec![invitee_contact.clone()],
 628                            ..Default::default()
 629                        },
 630                    )?;
 631                    self.peer.send(
 632                        connection_id,
 633                        proto::UpdateInviteInfo {
 634                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 635                            count: user.invite_count as u32,
 636                        },
 637                    )?;
 638                }
 639            }
 640        }
 641        Ok(())
 642    }
 643
 644    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 645        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 646            if let Some(invite_code) = &user.invite_code {
 647                let pool = self.connection_pool.lock();
 648                for connection_id in pool.user_connection_ids(user_id) {
 649                    self.peer.send(
 650                        connection_id,
 651                        proto::UpdateInviteInfo {
 652                            url: format!(
 653                                "{}{}",
 654                                self.app_state.config.invite_link_prefix, invite_code
 655                            ),
 656                            count: user.invite_count as u32,
 657                        },
 658                    )?;
 659                }
 660            }
 661        }
 662        Ok(())
 663    }
 664
 665    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 666        ServerSnapshot {
 667            connection_pool: ConnectionPoolGuard {
 668                guard: self.connection_pool.lock(),
 669                _not_send: PhantomData,
 670            },
 671            peer: &self.peer,
 672        }
 673    }
 674}
 675
 676impl<'a> Deref for ConnectionPoolGuard<'a> {
 677    type Target = ConnectionPool;
 678
 679    fn deref(&self) -> &Self::Target {
 680        &*self.guard
 681    }
 682}
 683
 684impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 685    fn deref_mut(&mut self) -> &mut Self::Target {
 686        &mut *self.guard
 687    }
 688}
 689
 690impl<'a> Drop for ConnectionPoolGuard<'a> {
 691    fn drop(&mut self) {
 692        #[cfg(test)]
 693        self.check_invariants();
 694    }
 695}
 696
 697fn broadcast<F>(
 698    sender_id: Option<ConnectionId>,
 699    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 700    mut f: F,
 701) where
 702    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 703{
 704    for receiver_id in receiver_ids {
 705        if Some(receiver_id) != sender_id {
 706            if let Err(error) = f(receiver_id) {
 707                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 708            }
 709        }
 710    }
 711}
 712
 713lazy_static! {
 714    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 715}
 716
 717pub struct ProtocolVersion(u32);
 718
 719impl Header for ProtocolVersion {
 720    fn name() -> &'static HeaderName {
 721        &ZED_PROTOCOL_VERSION
 722    }
 723
 724    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 725    where
 726        Self: Sized,
 727        I: Iterator<Item = &'i axum::http::HeaderValue>,
 728    {
 729        let version = values
 730            .next()
 731            .ok_or_else(axum::headers::Error::invalid)?
 732            .to_str()
 733            .map_err(|_| axum::headers::Error::invalid())?
 734            .parse()
 735            .map_err(|_| axum::headers::Error::invalid())?;
 736        Ok(Self(version))
 737    }
 738
 739    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 740        values.extend([self.0.to_string().parse().unwrap()]);
 741    }
 742}
 743
 744pub fn routes(server: Arc<Server>) -> Router<Body> {
 745    Router::new()
 746        .route("/rpc", get(handle_websocket_request))
 747        .layer(
 748            ServiceBuilder::new()
 749                .layer(Extension(server.app_state.clone()))
 750                .layer(middleware::from_fn(auth::validate_header)),
 751        )
 752        .route("/metrics", get(handle_metrics))
 753        .layer(Extension(server))
 754}
 755
 756pub async fn handle_websocket_request(
 757    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 758    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 759    Extension(server): Extension<Arc<Server>>,
 760    Extension(user): Extension<User>,
 761    ws: WebSocketUpgrade,
 762) -> axum::response::Response {
 763    if protocol_version != rpc::PROTOCOL_VERSION {
 764        return (
 765            StatusCode::UPGRADE_REQUIRED,
 766            "client must be upgraded".to_string(),
 767        )
 768            .into_response();
 769    }
 770    let socket_address = socket_address.to_string();
 771    ws.on_upgrade(move |socket| {
 772        use util::ResultExt;
 773        let socket = socket
 774            .map_ok(to_tungstenite_message)
 775            .err_into()
 776            .with(|message| async move { Ok(to_axum_message(message)) });
 777        let connection = Connection::new(Box::pin(socket));
 778        async move {
 779            server
 780                .handle_connection(connection, socket_address, user, None, Executor::Production)
 781                .await
 782                .log_err();
 783        }
 784    })
 785}
 786
 787pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 788    let connections = server
 789        .connection_pool
 790        .lock()
 791        .connections()
 792        .filter(|connection| !connection.admin)
 793        .count();
 794
 795    METRIC_CONNECTIONS.set(connections as _);
 796
 797    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 798    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 799
 800    let encoder = prometheus::TextEncoder::new();
 801    let metric_families = prometheus::gather();
 802    let encoded_metrics = encoder
 803        .encode_to_string(&metric_families)
 804        .map_err(|err| anyhow!("{}", err))?;
 805    Ok(encoded_metrics)
 806}
 807
 808#[instrument(err, skip(executor))]
 809async fn connection_lost(
 810    session: Session,
 811    mut teardown: watch::Receiver<()>,
 812    executor: Executor,
 813) -> Result<()> {
 814    session.peer.disconnect(session.connection_id);
 815    session
 816        .connection_pool()
 817        .await
 818        .remove_connection(session.connection_id)?;
 819
 820    session
 821        .db()
 822        .await
 823        .connection_lost(session.connection_id)
 824        .await
 825        .trace_err();
 826
 827    futures::select_biased! {
 828        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 829            leave_room_for_session(&session).await.trace_err();
 830
 831            if !session
 832                .connection_pool()
 833                .await
 834                .is_user_online(session.user_id)
 835            {
 836                let db = session.db().await;
 837                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 838                    room_updated(&room, &session.peer);
 839                }
 840            }
 841            update_user_contacts(session.user_id, &session).await?;
 842        }
 843        _ = teardown.changed().fuse() => {}
 844    }
 845
 846    Ok(())
 847}
 848
 849async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 850    response.send(proto::Ack {})?;
 851    Ok(())
 852}
 853
 854async fn create_room(
 855    _request: proto::CreateRoom,
 856    response: Response<proto::CreateRoom>,
 857    session: Session,
 858) -> Result<()> {
 859    let live_kit_room = nanoid::nanoid!(30);
 860    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 861        if let Some(_) = live_kit
 862            .create_room(live_kit_room.clone())
 863            .await
 864            .trace_err()
 865        {
 866            if let Some(token) = live_kit
 867                .room_token(&live_kit_room, &session.user_id.to_string())
 868                .trace_err()
 869            {
 870                Some(proto::LiveKitConnectionInfo {
 871                    server_url: live_kit.url().into(),
 872                    token,
 873                })
 874            } else {
 875                None
 876            }
 877        } else {
 878            None
 879        }
 880    } else {
 881        None
 882    };
 883
 884    {
 885        let room = session
 886            .db()
 887            .await
 888            .create_room(session.user_id, session.connection_id, &live_kit_room)
 889            .await?;
 890
 891        response.send(proto::CreateRoomResponse {
 892            room: Some(room.clone()),
 893            live_kit_connection_info,
 894        })?;
 895    }
 896
 897    update_user_contacts(session.user_id, &session).await?;
 898    Ok(())
 899}
 900
 901async fn join_room(
 902    request: proto::JoinRoom,
 903    response: Response<proto::JoinRoom>,
 904    session: Session,
 905) -> Result<()> {
 906    let room_id = RoomId::from_proto(request.id);
 907    let room = {
 908        let room = session
 909            .db()
 910            .await
 911            .join_room(room_id, session.user_id, session.connection_id)
 912            .await?;
 913        room_updated(&room, &session.peer);
 914        room.clone()
 915    };
 916
 917    for connection_id in session
 918        .connection_pool()
 919        .await
 920        .user_connection_ids(session.user_id)
 921    {
 922        session
 923            .peer
 924            .send(
 925                connection_id,
 926                proto::CallCanceled {
 927                    room_id: room_id.to_proto(),
 928                },
 929            )
 930            .trace_err();
 931    }
 932
 933    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 934        if let Some(token) = live_kit
 935            .room_token(&room.live_kit_room, &session.user_id.to_string())
 936            .trace_err()
 937        {
 938            Some(proto::LiveKitConnectionInfo {
 939                server_url: live_kit.url().into(),
 940                token,
 941            })
 942        } else {
 943            None
 944        }
 945    } else {
 946        None
 947    };
 948
 949    response.send(proto::JoinRoomResponse {
 950        room: Some(room),
 951        live_kit_connection_info,
 952    })?;
 953
 954    update_user_contacts(session.user_id, &session).await?;
 955    Ok(())
 956}
 957
 958async fn rejoin_room(
 959    request: proto::RejoinRoom,
 960    response: Response<proto::RejoinRoom>,
 961    session: Session,
 962) -> Result<()> {
 963    {
 964        let mut rejoined_room = session
 965            .db()
 966            .await
 967            .rejoin_room(request, session.user_id, session.connection_id)
 968            .await?;
 969
 970        response.send(proto::RejoinRoomResponse {
 971            room: Some(rejoined_room.room.clone()),
 972            reshared_projects: rejoined_room
 973                .reshared_projects
 974                .iter()
 975                .map(|project| proto::ResharedProject {
 976                    id: project.id.to_proto(),
 977                    collaborators: project
 978                        .collaborators
 979                        .iter()
 980                        .map(|collaborator| collaborator.to_proto())
 981                        .collect(),
 982                })
 983                .collect(),
 984            rejoined_projects: rejoined_room
 985                .rejoined_projects
 986                .iter()
 987                .map(|rejoined_project| proto::RejoinedProject {
 988                    id: rejoined_project.id.to_proto(),
 989                    worktrees: rejoined_project
 990                        .worktrees
 991                        .iter()
 992                        .map(|worktree| proto::WorktreeMetadata {
 993                            id: worktree.id,
 994                            root_name: worktree.root_name.clone(),
 995                            visible: worktree.visible,
 996                            abs_path: worktree.abs_path.clone(),
 997                        })
 998                        .collect(),
 999                    collaborators: rejoined_project
1000                        .collaborators
1001                        .iter()
1002                        .map(|collaborator| collaborator.to_proto())
1003                        .collect(),
1004                    language_servers: rejoined_project.language_servers.clone(),
1005                })
1006                .collect(),
1007        })?;
1008        room_updated(&rejoined_room.room, &session.peer);
1009
1010        for project in &rejoined_room.reshared_projects {
1011            for collaborator in &project.collaborators {
1012                session
1013                    .peer
1014                    .send(
1015                        collaborator.connection_id,
1016                        proto::UpdateProjectCollaborator {
1017                            project_id: project.id.to_proto(),
1018                            old_peer_id: Some(project.old_connection_id.into()),
1019                            new_peer_id: Some(session.connection_id.into()),
1020                        },
1021                    )
1022                    .trace_err();
1023            }
1024
1025            broadcast(
1026                Some(session.connection_id),
1027                project
1028                    .collaborators
1029                    .iter()
1030                    .map(|collaborator| collaborator.connection_id),
1031                |connection_id| {
1032                    session.peer.forward_send(
1033                        session.connection_id,
1034                        connection_id,
1035                        proto::UpdateProject {
1036                            project_id: project.id.to_proto(),
1037                            worktrees: project.worktrees.clone(),
1038                        },
1039                    )
1040                },
1041            );
1042        }
1043
1044        for project in &rejoined_room.rejoined_projects {
1045            for collaborator in &project.collaborators {
1046                session
1047                    .peer
1048                    .send(
1049                        collaborator.connection_id,
1050                        proto::UpdateProjectCollaborator {
1051                            project_id: project.id.to_proto(),
1052                            old_peer_id: Some(project.old_connection_id.into()),
1053                            new_peer_id: Some(session.connection_id.into()),
1054                        },
1055                    )
1056                    .trace_err();
1057            }
1058        }
1059
1060        for project in &mut rejoined_room.rejoined_projects {
1061            for worktree in mem::take(&mut project.worktrees) {
1062                #[cfg(any(test, feature = "test-support"))]
1063                const MAX_CHUNK_SIZE: usize = 2;
1064                #[cfg(not(any(test, feature = "test-support")))]
1065                const MAX_CHUNK_SIZE: usize = 256;
1066
1067                // Stream this worktree's entries.
1068                let message = proto::UpdateWorktree {
1069                    project_id: project.id.to_proto(),
1070                    worktree_id: worktree.id,
1071                    abs_path: worktree.abs_path.clone(),
1072                    root_name: worktree.root_name,
1073                    updated_entries: worktree.updated_entries,
1074                    removed_entries: worktree.removed_entries,
1075                    scan_id: worktree.scan_id,
1076                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1077                    updated_repositories: worktree.updated_repositories,
1078                    removed_repositories: worktree.removed_repositories,
1079                };
1080                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1081                    session.peer.send(session.connection_id, update.clone())?;
1082                }
1083
1084                // Stream this worktree's diagnostics.
1085                for summary in worktree.diagnostic_summaries {
1086                    session.peer.send(
1087                        session.connection_id,
1088                        proto::UpdateDiagnosticSummary {
1089                            project_id: project.id.to_proto(),
1090                            worktree_id: worktree.id,
1091                            summary: Some(summary),
1092                        },
1093                    )?;
1094                }
1095
1096                for settings_file in worktree.settings_files {
1097                    session.peer.send(
1098                        session.connection_id,
1099                        proto::UpdateWorktreeSettings {
1100                            project_id: project.id.to_proto(),
1101                            worktree_id: worktree.id,
1102                            path: settings_file.path,
1103                            content: Some(settings_file.content),
1104                        },
1105                    )?;
1106                }
1107            }
1108
1109            for language_server in &project.language_servers {
1110                session.peer.send(
1111                    session.connection_id,
1112                    proto::UpdateLanguageServer {
1113                        project_id: project.id.to_proto(),
1114                        language_server_id: language_server.id,
1115                        variant: Some(
1116                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1117                                proto::LspDiskBasedDiagnosticsUpdated {},
1118                            ),
1119                        ),
1120                    },
1121                )?;
1122            }
1123        }
1124    }
1125
1126    update_user_contacts(session.user_id, &session).await?;
1127    Ok(())
1128}
1129
1130async fn leave_room(
1131    _: proto::LeaveRoom,
1132    response: Response<proto::LeaveRoom>,
1133    session: Session,
1134) -> Result<()> {
1135    leave_room_for_session(&session).await?;
1136    response.send(proto::Ack {})?;
1137    Ok(())
1138}
1139
1140async fn call(
1141    request: proto::Call,
1142    response: Response<proto::Call>,
1143    session: Session,
1144) -> Result<()> {
1145    let room_id = RoomId::from_proto(request.room_id);
1146    let calling_user_id = session.user_id;
1147    let calling_connection_id = session.connection_id;
1148    let called_user_id = UserId::from_proto(request.called_user_id);
1149    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1150    if !session
1151        .db()
1152        .await
1153        .has_contact(calling_user_id, called_user_id)
1154        .await?
1155    {
1156        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1157    }
1158
1159    let incoming_call = {
1160        let (room, incoming_call) = &mut *session
1161            .db()
1162            .await
1163            .call(
1164                room_id,
1165                calling_user_id,
1166                calling_connection_id,
1167                called_user_id,
1168                initial_project_id,
1169            )
1170            .await?;
1171        room_updated(&room, &session.peer);
1172        mem::take(incoming_call)
1173    };
1174    update_user_contacts(called_user_id, &session).await?;
1175
1176    let mut calls = session
1177        .connection_pool()
1178        .await
1179        .user_connection_ids(called_user_id)
1180        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1181        .collect::<FuturesUnordered<_>>();
1182
1183    while let Some(call_response) = calls.next().await {
1184        match call_response.as_ref() {
1185            Ok(_) => {
1186                response.send(proto::Ack {})?;
1187                return Ok(());
1188            }
1189            Err(_) => {
1190                call_response.trace_err();
1191            }
1192        }
1193    }
1194
1195    {
1196        let room = session
1197            .db()
1198            .await
1199            .call_failed(room_id, called_user_id)
1200            .await?;
1201        room_updated(&room, &session.peer);
1202    }
1203    update_user_contacts(called_user_id, &session).await?;
1204
1205    Err(anyhow!("failed to ring user"))?
1206}
1207
1208async fn cancel_call(
1209    request: proto::CancelCall,
1210    response: Response<proto::CancelCall>,
1211    session: Session,
1212) -> Result<()> {
1213    let called_user_id = UserId::from_proto(request.called_user_id);
1214    let room_id = RoomId::from_proto(request.room_id);
1215    {
1216        let room = session
1217            .db()
1218            .await
1219            .cancel_call(room_id, session.connection_id, called_user_id)
1220            .await?;
1221        room_updated(&room, &session.peer);
1222    }
1223
1224    for connection_id in session
1225        .connection_pool()
1226        .await
1227        .user_connection_ids(called_user_id)
1228    {
1229        session
1230            .peer
1231            .send(
1232                connection_id,
1233                proto::CallCanceled {
1234                    room_id: room_id.to_proto(),
1235                },
1236            )
1237            .trace_err();
1238    }
1239    response.send(proto::Ack {})?;
1240
1241    update_user_contacts(called_user_id, &session).await?;
1242    Ok(())
1243}
1244
1245async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1246    let room_id = RoomId::from_proto(message.room_id);
1247    {
1248        let room = session
1249            .db()
1250            .await
1251            .decline_call(Some(room_id), session.user_id)
1252            .await?
1253            .ok_or_else(|| anyhow!("failed to decline call"))?;
1254        room_updated(&room, &session.peer);
1255    }
1256
1257    for connection_id in session
1258        .connection_pool()
1259        .await
1260        .user_connection_ids(session.user_id)
1261    {
1262        session
1263            .peer
1264            .send(
1265                connection_id,
1266                proto::CallCanceled {
1267                    room_id: room_id.to_proto(),
1268                },
1269            )
1270            .trace_err();
1271    }
1272    update_user_contacts(session.user_id, &session).await?;
1273    Ok(())
1274}
1275
1276async fn update_participant_location(
1277    request: proto::UpdateParticipantLocation,
1278    response: Response<proto::UpdateParticipantLocation>,
1279    session: Session,
1280) -> Result<()> {
1281    let room_id = RoomId::from_proto(request.room_id);
1282    let location = request
1283        .location
1284        .ok_or_else(|| anyhow!("invalid location"))?;
1285    let room = session
1286        .db()
1287        .await
1288        .update_room_participant_location(room_id, session.connection_id, location)
1289        .await?;
1290    room_updated(&room, &session.peer);
1291    response.send(proto::Ack {})?;
1292    Ok(())
1293}
1294
1295async fn share_project(
1296    request: proto::ShareProject,
1297    response: Response<proto::ShareProject>,
1298    session: Session,
1299) -> Result<()> {
1300    let (project_id, room) = &*session
1301        .db()
1302        .await
1303        .share_project(
1304            RoomId::from_proto(request.room_id),
1305            session.connection_id,
1306            &request.worktrees,
1307        )
1308        .await?;
1309    response.send(proto::ShareProjectResponse {
1310        project_id: project_id.to_proto(),
1311    })?;
1312    room_updated(&room, &session.peer);
1313
1314    Ok(())
1315}
1316
1317async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1318    let project_id = ProjectId::from_proto(message.project_id);
1319
1320    let (room, guest_connection_ids) = &*session
1321        .db()
1322        .await
1323        .unshare_project(project_id, session.connection_id)
1324        .await?;
1325
1326    broadcast(
1327        Some(session.connection_id),
1328        guest_connection_ids.iter().copied(),
1329        |conn_id| session.peer.send(conn_id, message.clone()),
1330    );
1331    room_updated(&room, &session.peer);
1332
1333    Ok(())
1334}
1335
1336async fn join_project(
1337    request: proto::JoinProject,
1338    response: Response<proto::JoinProject>,
1339    session: Session,
1340) -> Result<()> {
1341    let project_id = ProjectId::from_proto(request.project_id);
1342    let guest_user_id = session.user_id;
1343
1344    tracing::info!(%project_id, "join project");
1345
1346    let (project, replica_id) = &mut *session
1347        .db()
1348        .await
1349        .join_project(project_id, session.connection_id)
1350        .await?;
1351
1352    let collaborators = project
1353        .collaborators
1354        .iter()
1355        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1356        .map(|collaborator| collaborator.to_proto())
1357        .collect::<Vec<_>>();
1358
1359    let worktrees = project
1360        .worktrees
1361        .iter()
1362        .map(|(id, worktree)| proto::WorktreeMetadata {
1363            id: *id,
1364            root_name: worktree.root_name.clone(),
1365            visible: worktree.visible,
1366            abs_path: worktree.abs_path.clone(),
1367        })
1368        .collect::<Vec<_>>();
1369
1370    for collaborator in &collaborators {
1371        session
1372            .peer
1373            .send(
1374                collaborator.peer_id.unwrap().into(),
1375                proto::AddProjectCollaborator {
1376                    project_id: project_id.to_proto(),
1377                    collaborator: Some(proto::Collaborator {
1378                        peer_id: Some(session.connection_id.into()),
1379                        replica_id: replica_id.0 as u32,
1380                        user_id: guest_user_id.to_proto(),
1381                    }),
1382                },
1383            )
1384            .trace_err();
1385    }
1386
1387    // First, we send the metadata associated with each worktree.
1388    response.send(proto::JoinProjectResponse {
1389        worktrees: worktrees.clone(),
1390        replica_id: replica_id.0 as u32,
1391        collaborators: collaborators.clone(),
1392        language_servers: project.language_servers.clone(),
1393    })?;
1394
1395    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1396        #[cfg(any(test, feature = "test-support"))]
1397        const MAX_CHUNK_SIZE: usize = 2;
1398        #[cfg(not(any(test, feature = "test-support")))]
1399        const MAX_CHUNK_SIZE: usize = 256;
1400
1401        // Stream this worktree's entries.
1402        let message = proto::UpdateWorktree {
1403            project_id: project_id.to_proto(),
1404            worktree_id,
1405            abs_path: worktree.abs_path.clone(),
1406            root_name: worktree.root_name,
1407            updated_entries: worktree.entries,
1408            removed_entries: Default::default(),
1409            scan_id: worktree.scan_id,
1410            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1411            updated_repositories: worktree.repository_entries.into_values().collect(),
1412            removed_repositories: Default::default(),
1413        };
1414        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1415            session.peer.send(session.connection_id, update.clone())?;
1416        }
1417
1418        // Stream this worktree's diagnostics.
1419        for summary in worktree.diagnostic_summaries {
1420            session.peer.send(
1421                session.connection_id,
1422                proto::UpdateDiagnosticSummary {
1423                    project_id: project_id.to_proto(),
1424                    worktree_id: worktree.id,
1425                    summary: Some(summary),
1426                },
1427            )?;
1428        }
1429
1430        for settings_file in worktree.settings_files {
1431            session.peer.send(
1432                session.connection_id,
1433                proto::UpdateWorktreeSettings {
1434                    project_id: project_id.to_proto(),
1435                    worktree_id: worktree.id,
1436                    path: settings_file.path,
1437                    content: Some(settings_file.content),
1438                },
1439            )?;
1440        }
1441    }
1442
1443    for language_server in &project.language_servers {
1444        session.peer.send(
1445            session.connection_id,
1446            proto::UpdateLanguageServer {
1447                project_id: project_id.to_proto(),
1448                language_server_id: language_server.id,
1449                variant: Some(
1450                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1451                        proto::LspDiskBasedDiagnosticsUpdated {},
1452                    ),
1453                ),
1454            },
1455        )?;
1456    }
1457
1458    Ok(())
1459}
1460
1461async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1462    let sender_id = session.connection_id;
1463    let project_id = ProjectId::from_proto(request.project_id);
1464
1465    let (room, project) = &*session
1466        .db()
1467        .await
1468        .leave_project(project_id, sender_id)
1469        .await?;
1470    tracing::info!(
1471        %project_id,
1472        host_user_id = %project.host_user_id,
1473        host_connection_id = %project.host_connection_id,
1474        "leave project"
1475    );
1476
1477    project_left(&project, &session);
1478    room_updated(&room, &session.peer);
1479
1480    Ok(())
1481}
1482
1483async fn update_project(
1484    request: proto::UpdateProject,
1485    response: Response<proto::UpdateProject>,
1486    session: Session,
1487) -> Result<()> {
1488    let project_id = ProjectId::from_proto(request.project_id);
1489    let (room, guest_connection_ids) = &*session
1490        .db()
1491        .await
1492        .update_project(project_id, session.connection_id, &request.worktrees)
1493        .await?;
1494    broadcast(
1495        Some(session.connection_id),
1496        guest_connection_ids.iter().copied(),
1497        |connection_id| {
1498            session
1499                .peer
1500                .forward_send(session.connection_id, connection_id, request.clone())
1501        },
1502    );
1503    room_updated(&room, &session.peer);
1504    response.send(proto::Ack {})?;
1505
1506    Ok(())
1507}
1508
1509async fn update_worktree(
1510    request: proto::UpdateWorktree,
1511    response: Response<proto::UpdateWorktree>,
1512    session: Session,
1513) -> Result<()> {
1514    let guest_connection_ids = session
1515        .db()
1516        .await
1517        .update_worktree(&request, session.connection_id)
1518        .await?;
1519
1520    broadcast(
1521        Some(session.connection_id),
1522        guest_connection_ids.iter().copied(),
1523        |connection_id| {
1524            session
1525                .peer
1526                .forward_send(session.connection_id, connection_id, request.clone())
1527        },
1528    );
1529    response.send(proto::Ack {})?;
1530    Ok(())
1531}
1532
1533async fn update_diagnostic_summary(
1534    message: proto::UpdateDiagnosticSummary,
1535    session: Session,
1536) -> Result<()> {
1537    let guest_connection_ids = session
1538        .db()
1539        .await
1540        .update_diagnostic_summary(&message, session.connection_id)
1541        .await?;
1542
1543    broadcast(
1544        Some(session.connection_id),
1545        guest_connection_ids.iter().copied(),
1546        |connection_id| {
1547            session
1548                .peer
1549                .forward_send(session.connection_id, connection_id, message.clone())
1550        },
1551    );
1552
1553    Ok(())
1554}
1555
1556async fn update_worktree_settings(
1557    message: proto::UpdateWorktreeSettings,
1558    session: Session,
1559) -> Result<()> {
1560    let guest_connection_ids = session
1561        .db()
1562        .await
1563        .update_worktree_settings(&message, session.connection_id)
1564        .await?;
1565
1566    broadcast(
1567        Some(session.connection_id),
1568        guest_connection_ids.iter().copied(),
1569        |connection_id| {
1570            session
1571                .peer
1572                .forward_send(session.connection_id, connection_id, message.clone())
1573        },
1574    );
1575
1576    Ok(())
1577}
1578
1579async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1580    broadcast_project_message(request.project_id, request, session).await
1581}
1582
1583async fn start_language_server(
1584    request: proto::StartLanguageServer,
1585    session: Session,
1586) -> Result<()> {
1587    let guest_connection_ids = session
1588        .db()
1589        .await
1590        .start_language_server(&request, session.connection_id)
1591        .await?;
1592
1593    broadcast(
1594        Some(session.connection_id),
1595        guest_connection_ids.iter().copied(),
1596        |connection_id| {
1597            session
1598                .peer
1599                .forward_send(session.connection_id, connection_id, request.clone())
1600        },
1601    );
1602    Ok(())
1603}
1604
1605async fn update_language_server(
1606    request: proto::UpdateLanguageServer,
1607    session: Session,
1608) -> Result<()> {
1609    session.executor.record_backtrace();
1610    let project_id = ProjectId::from_proto(request.project_id);
1611    let project_connection_ids = session
1612        .db()
1613        .await
1614        .project_connection_ids(project_id, session.connection_id)
1615        .await?;
1616    broadcast(
1617        Some(session.connection_id),
1618        project_connection_ids.iter().copied(),
1619        |connection_id| {
1620            session
1621                .peer
1622                .forward_send(session.connection_id, connection_id, request.clone())
1623        },
1624    );
1625    Ok(())
1626}
1627
1628async fn forward_project_request<T>(
1629    request: T,
1630    response: Response<T>,
1631    session: Session,
1632) -> Result<()>
1633where
1634    T: EntityMessage + RequestMessage,
1635{
1636    session.executor.record_backtrace();
1637    let project_id = ProjectId::from_proto(request.remote_entity_id());
1638    let host_connection_id = {
1639        let collaborators = session
1640            .db()
1641            .await
1642            .project_collaborators(project_id, session.connection_id)
1643            .await?;
1644        collaborators
1645            .iter()
1646            .find(|collaborator| collaborator.is_host)
1647            .ok_or_else(|| anyhow!("host not found"))?
1648            .connection_id
1649    };
1650
1651    let payload = session
1652        .peer
1653        .forward_request(session.connection_id, host_connection_id, request)
1654        .await?;
1655
1656    response.send(payload)?;
1657    Ok(())
1658}
1659
1660async fn create_buffer_for_peer(
1661    request: proto::CreateBufferForPeer,
1662    session: Session,
1663) -> Result<()> {
1664    session.executor.record_backtrace();
1665    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1666    session
1667        .peer
1668        .forward_send(session.connection_id, peer_id.into(), request)?;
1669    Ok(())
1670}
1671
1672async fn update_buffer(
1673    request: proto::UpdateBuffer,
1674    response: Response<proto::UpdateBuffer>,
1675    session: Session,
1676) -> Result<()> {
1677    session.executor.record_backtrace();
1678    let project_id = ProjectId::from_proto(request.project_id);
1679    let mut guest_connection_ids;
1680    let mut host_connection_id = None;
1681    {
1682        let collaborators = session
1683            .db()
1684            .await
1685            .project_collaborators(project_id, session.connection_id)
1686            .await?;
1687        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1688        for collaborator in collaborators.iter() {
1689            if collaborator.is_host {
1690                host_connection_id = Some(collaborator.connection_id);
1691            } else {
1692                guest_connection_ids.push(collaborator.connection_id);
1693            }
1694        }
1695    }
1696    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1697
1698    session.executor.record_backtrace();
1699    broadcast(
1700        Some(session.connection_id),
1701        guest_connection_ids,
1702        |connection_id| {
1703            session
1704                .peer
1705                .forward_send(session.connection_id, connection_id, request.clone())
1706        },
1707    );
1708    if host_connection_id != session.connection_id {
1709        session
1710            .peer
1711            .forward_request(session.connection_id, host_connection_id, request.clone())
1712            .await?;
1713    }
1714
1715    response.send(proto::Ack {})?;
1716    Ok(())
1717}
1718
1719async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1720    let project_id = ProjectId::from_proto(request.project_id);
1721    let project_connection_ids = session
1722        .db()
1723        .await
1724        .project_connection_ids(project_id, session.connection_id)
1725        .await?;
1726
1727    broadcast(
1728        Some(session.connection_id),
1729        project_connection_ids.iter().copied(),
1730        |connection_id| {
1731            session
1732                .peer
1733                .forward_send(session.connection_id, connection_id, request.clone())
1734        },
1735    );
1736    Ok(())
1737}
1738
1739async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1740    let project_id = ProjectId::from_proto(request.project_id);
1741    let project_connection_ids = session
1742        .db()
1743        .await
1744        .project_connection_ids(project_id, session.connection_id)
1745        .await?;
1746    broadcast(
1747        Some(session.connection_id),
1748        project_connection_ids.iter().copied(),
1749        |connection_id| {
1750            session
1751                .peer
1752                .forward_send(session.connection_id, connection_id, request.clone())
1753        },
1754    );
1755    Ok(())
1756}
1757
1758async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1759    broadcast_project_message(request.project_id, request, session).await
1760}
1761
1762async fn broadcast_project_message<T: EnvelopedMessage>(
1763    project_id: u64,
1764    request: T,
1765    session: Session,
1766) -> Result<()> {
1767    let project_id = ProjectId::from_proto(project_id);
1768    let project_connection_ids = session
1769        .db()
1770        .await
1771        .project_connection_ids(project_id, session.connection_id)
1772        .await?;
1773    broadcast(
1774        Some(session.connection_id),
1775        project_connection_ids.iter().copied(),
1776        |connection_id| {
1777            session
1778                .peer
1779                .forward_send(session.connection_id, connection_id, request.clone())
1780        },
1781    );
1782    Ok(())
1783}
1784
1785async fn follow(
1786    request: proto::Follow,
1787    response: Response<proto::Follow>,
1788    session: Session,
1789) -> Result<()> {
1790    let project_id = ProjectId::from_proto(request.project_id);
1791    let leader_id = request
1792        .leader_id
1793        .ok_or_else(|| anyhow!("invalid leader id"))?
1794        .into();
1795    let follower_id = session.connection_id;
1796
1797    {
1798        let project_connection_ids = session
1799            .db()
1800            .await
1801            .project_connection_ids(project_id, session.connection_id)
1802            .await?;
1803
1804        if !project_connection_ids.contains(&leader_id) {
1805            Err(anyhow!("no such peer"))?;
1806        }
1807    }
1808
1809    let mut response_payload = session
1810        .peer
1811        .forward_request(session.connection_id, leader_id, request)
1812        .await?;
1813    response_payload
1814        .views
1815        .retain(|view| view.leader_id != Some(follower_id.into()));
1816    response.send(response_payload)?;
1817
1818    let room = session
1819        .db()
1820        .await
1821        .follow(project_id, leader_id, follower_id)
1822        .await?;
1823    room_updated(&room, &session.peer);
1824
1825    Ok(())
1826}
1827
1828async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1829    let project_id = ProjectId::from_proto(request.project_id);
1830    let leader_id = request
1831        .leader_id
1832        .ok_or_else(|| anyhow!("invalid leader id"))?
1833        .into();
1834    let follower_id = session.connection_id;
1835
1836    if !session
1837        .db()
1838        .await
1839        .project_connection_ids(project_id, session.connection_id)
1840        .await?
1841        .contains(&leader_id)
1842    {
1843        Err(anyhow!("no such peer"))?;
1844    }
1845
1846    session
1847        .peer
1848        .forward_send(session.connection_id, leader_id, request)?;
1849
1850    let room = session
1851        .db()
1852        .await
1853        .unfollow(project_id, leader_id, follower_id)
1854        .await?;
1855    room_updated(&room, &session.peer);
1856
1857    Ok(())
1858}
1859
1860async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1861    let project_id = ProjectId::from_proto(request.project_id);
1862    let project_connection_ids = session
1863        .db
1864        .lock()
1865        .await
1866        .project_connection_ids(project_id, session.connection_id)
1867        .await?;
1868
1869    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1870        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1871        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1872        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1873    });
1874    for follower_peer_id in request.follower_ids.iter().copied() {
1875        let follower_connection_id = follower_peer_id.into();
1876        if project_connection_ids.contains(&follower_connection_id)
1877            && Some(follower_peer_id) != leader_id
1878        {
1879            session.peer.forward_send(
1880                session.connection_id,
1881                follower_connection_id,
1882                request.clone(),
1883            )?;
1884        }
1885    }
1886    Ok(())
1887}
1888
1889async fn get_users(
1890    request: proto::GetUsers,
1891    response: Response<proto::GetUsers>,
1892    session: Session,
1893) -> Result<()> {
1894    let user_ids = request
1895        .user_ids
1896        .into_iter()
1897        .map(UserId::from_proto)
1898        .collect();
1899    let users = session
1900        .db()
1901        .await
1902        .get_users_by_ids(user_ids)
1903        .await?
1904        .into_iter()
1905        .map(|user| proto::User {
1906            id: user.id.to_proto(),
1907            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1908            github_login: user.github_login,
1909        })
1910        .collect();
1911    response.send(proto::UsersResponse { users })?;
1912    Ok(())
1913}
1914
1915async fn fuzzy_search_users(
1916    request: proto::FuzzySearchUsers,
1917    response: Response<proto::FuzzySearchUsers>,
1918    session: Session,
1919) -> Result<()> {
1920    let query = request.query;
1921    let users = match query.len() {
1922        0 => vec![],
1923        1 | 2 => session
1924            .db()
1925            .await
1926            .get_user_by_github_login(&query)
1927            .await?
1928            .into_iter()
1929            .collect(),
1930        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1931    };
1932    let users = users
1933        .into_iter()
1934        .filter(|user| user.id != session.user_id)
1935        .map(|user| proto::User {
1936            id: user.id.to_proto(),
1937            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1938            github_login: user.github_login,
1939        })
1940        .collect();
1941    response.send(proto::UsersResponse { users })?;
1942    Ok(())
1943}
1944
1945async fn request_contact(
1946    request: proto::RequestContact,
1947    response: Response<proto::RequestContact>,
1948    session: Session,
1949) -> Result<()> {
1950    let requester_id = session.user_id;
1951    let responder_id = UserId::from_proto(request.responder_id);
1952    if requester_id == responder_id {
1953        return Err(anyhow!("cannot add yourself as a contact"))?;
1954    }
1955
1956    session
1957        .db()
1958        .await
1959        .send_contact_request(requester_id, responder_id)
1960        .await?;
1961
1962    // Update outgoing contact requests of requester
1963    let mut update = proto::UpdateContacts::default();
1964    update.outgoing_requests.push(responder_id.to_proto());
1965    for connection_id in session
1966        .connection_pool()
1967        .await
1968        .user_connection_ids(requester_id)
1969    {
1970        session.peer.send(connection_id, update.clone())?;
1971    }
1972
1973    // Update incoming contact requests of responder
1974    let mut update = proto::UpdateContacts::default();
1975    update
1976        .incoming_requests
1977        .push(proto::IncomingContactRequest {
1978            requester_id: requester_id.to_proto(),
1979            should_notify: true,
1980        });
1981    for connection_id in session
1982        .connection_pool()
1983        .await
1984        .user_connection_ids(responder_id)
1985    {
1986        session.peer.send(connection_id, update.clone())?;
1987    }
1988
1989    response.send(proto::Ack {})?;
1990    Ok(())
1991}
1992
1993async fn respond_to_contact_request(
1994    request: proto::RespondToContactRequest,
1995    response: Response<proto::RespondToContactRequest>,
1996    session: Session,
1997) -> Result<()> {
1998    let responder_id = session.user_id;
1999    let requester_id = UserId::from_proto(request.requester_id);
2000    let db = session.db().await;
2001    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2002        db.dismiss_contact_notification(responder_id, requester_id)
2003            .await?;
2004    } else {
2005        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2006
2007        db.respond_to_contact_request(responder_id, requester_id, accept)
2008            .await?;
2009        let requester_busy = db.is_user_busy(requester_id).await?;
2010        let responder_busy = db.is_user_busy(responder_id).await?;
2011
2012        let pool = session.connection_pool().await;
2013        // Update responder with new contact
2014        let mut update = proto::UpdateContacts::default();
2015        if accept {
2016            update
2017                .contacts
2018                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2019        }
2020        update
2021            .remove_incoming_requests
2022            .push(requester_id.to_proto());
2023        for connection_id in pool.user_connection_ids(responder_id) {
2024            session.peer.send(connection_id, update.clone())?;
2025        }
2026
2027        // Update requester with new contact
2028        let mut update = proto::UpdateContacts::default();
2029        if accept {
2030            update
2031                .contacts
2032                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2033        }
2034        update
2035            .remove_outgoing_requests
2036            .push(responder_id.to_proto());
2037        for connection_id in pool.user_connection_ids(requester_id) {
2038            session.peer.send(connection_id, update.clone())?;
2039        }
2040    }
2041
2042    response.send(proto::Ack {})?;
2043    Ok(())
2044}
2045
2046async fn remove_contact(
2047    request: proto::RemoveContact,
2048    response: Response<proto::RemoveContact>,
2049    session: Session,
2050) -> Result<()> {
2051    let requester_id = session.user_id;
2052    let responder_id = UserId::from_proto(request.user_id);
2053    let db = session.db().await;
2054    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2055
2056    let pool = session.connection_pool().await;
2057    // Update outgoing contact requests of requester
2058    let mut update = proto::UpdateContacts::default();
2059    if contact_accepted {
2060        update.remove_contacts.push(responder_id.to_proto());
2061    } else {
2062        update
2063            .remove_outgoing_requests
2064            .push(responder_id.to_proto());
2065    }
2066    for connection_id in pool.user_connection_ids(requester_id) {
2067        session.peer.send(connection_id, update.clone())?;
2068    }
2069
2070    // Update incoming contact requests of responder
2071    let mut update = proto::UpdateContacts::default();
2072    if contact_accepted {
2073        update.remove_contacts.push(requester_id.to_proto());
2074    } else {
2075        update
2076            .remove_incoming_requests
2077            .push(requester_id.to_proto());
2078    }
2079    for connection_id in pool.user_connection_ids(responder_id) {
2080        session.peer.send(connection_id, update.clone())?;
2081    }
2082
2083    response.send(proto::Ack {})?;
2084    Ok(())
2085}
2086
2087async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2088    let project_id = ProjectId::from_proto(request.project_id);
2089    let project_connection_ids = session
2090        .db()
2091        .await
2092        .project_connection_ids(project_id, session.connection_id)
2093        .await?;
2094    broadcast(
2095        Some(session.connection_id),
2096        project_connection_ids.iter().copied(),
2097        |connection_id| {
2098            session
2099                .peer
2100                .forward_send(session.connection_id, connection_id, request.clone())
2101        },
2102    );
2103    Ok(())
2104}
2105
2106async fn get_private_user_info(
2107    _request: proto::GetPrivateUserInfo,
2108    response: Response<proto::GetPrivateUserInfo>,
2109    session: Session,
2110) -> Result<()> {
2111    let metrics_id = session
2112        .db()
2113        .await
2114        .get_user_metrics_id(session.user_id)
2115        .await?;
2116    let user = session
2117        .db()
2118        .await
2119        .get_user_by_id(session.user_id)
2120        .await?
2121        .ok_or_else(|| anyhow!("user not found"))?;
2122    response.send(proto::GetPrivateUserInfoResponse {
2123        metrics_id,
2124        staff: user.admin,
2125    })?;
2126    Ok(())
2127}
2128
2129fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2130    match message {
2131        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2132        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2133        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2134        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2135        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2136            code: frame.code.into(),
2137            reason: frame.reason,
2138        })),
2139    }
2140}
2141
2142fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2143    match message {
2144        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2145        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2146        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2147        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2148        AxumMessage::Close(frame) => {
2149            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2150                code: frame.code.into(),
2151                reason: frame.reason,
2152            }))
2153        }
2154    }
2155}
2156
2157fn build_initial_contacts_update(
2158    contacts: Vec<db::Contact>,
2159    pool: &ConnectionPool,
2160) -> proto::UpdateContacts {
2161    let mut update = proto::UpdateContacts::default();
2162
2163    for contact in contacts {
2164        match contact {
2165            db::Contact::Accepted {
2166                user_id,
2167                should_notify,
2168                busy,
2169            } => {
2170                update
2171                    .contacts
2172                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2173            }
2174            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2175            db::Contact::Incoming {
2176                user_id,
2177                should_notify,
2178            } => update
2179                .incoming_requests
2180                .push(proto::IncomingContactRequest {
2181                    requester_id: user_id.to_proto(),
2182                    should_notify,
2183                }),
2184        }
2185    }
2186
2187    update
2188}
2189
2190fn contact_for_user(
2191    user_id: UserId,
2192    should_notify: bool,
2193    busy: bool,
2194    pool: &ConnectionPool,
2195) -> proto::Contact {
2196    proto::Contact {
2197        user_id: user_id.to_proto(),
2198        online: pool.is_user_online(user_id),
2199        busy,
2200        should_notify,
2201    }
2202}
2203
2204fn room_updated(room: &proto::Room, peer: &Peer) {
2205    broadcast(
2206        None,
2207        room.participants
2208            .iter()
2209            .filter_map(|participant| Some(participant.peer_id?.into())),
2210        |peer_id| {
2211            peer.send(
2212                peer_id.into(),
2213                proto::RoomUpdated {
2214                    room: Some(room.clone()),
2215                },
2216            )
2217        },
2218    );
2219}
2220
2221async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2222    let db = session.db().await;
2223    let contacts = db.get_contacts(user_id).await?;
2224    let busy = db.is_user_busy(user_id).await?;
2225
2226    let pool = session.connection_pool().await;
2227    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2228    for contact in contacts {
2229        if let db::Contact::Accepted {
2230            user_id: contact_user_id,
2231            ..
2232        } = contact
2233        {
2234            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2235                session
2236                    .peer
2237                    .send(
2238                        contact_conn_id,
2239                        proto::UpdateContacts {
2240                            contacts: vec![updated_contact.clone()],
2241                            remove_contacts: Default::default(),
2242                            incoming_requests: Default::default(),
2243                            remove_incoming_requests: Default::default(),
2244                            outgoing_requests: Default::default(),
2245                            remove_outgoing_requests: Default::default(),
2246                        },
2247                    )
2248                    .trace_err();
2249            }
2250        }
2251    }
2252    Ok(())
2253}
2254
2255async fn leave_room_for_session(session: &Session) -> Result<()> {
2256    let mut contacts_to_update = HashSet::default();
2257
2258    let room_id;
2259    let canceled_calls_to_user_ids;
2260    let live_kit_room;
2261    let delete_live_kit_room;
2262    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2263        contacts_to_update.insert(session.user_id);
2264
2265        for project in left_room.left_projects.values() {
2266            project_left(project, session);
2267        }
2268
2269        room_updated(&left_room.room, &session.peer);
2270        room_id = RoomId::from_proto(left_room.room.id);
2271        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2272        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2273        delete_live_kit_room = left_room.room.participants.is_empty();
2274    } else {
2275        return Ok(());
2276    }
2277
2278    {
2279        let pool = session.connection_pool().await;
2280        for canceled_user_id in canceled_calls_to_user_ids {
2281            for connection_id in pool.user_connection_ids(canceled_user_id) {
2282                session
2283                    .peer
2284                    .send(
2285                        connection_id,
2286                        proto::CallCanceled {
2287                            room_id: room_id.to_proto(),
2288                        },
2289                    )
2290                    .trace_err();
2291            }
2292            contacts_to_update.insert(canceled_user_id);
2293        }
2294    }
2295
2296    for contact_user_id in contacts_to_update {
2297        update_user_contacts(contact_user_id, &session).await?;
2298    }
2299
2300    if let Some(live_kit) = session.live_kit_client.as_ref() {
2301        live_kit
2302            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2303            .await
2304            .trace_err();
2305
2306        if delete_live_kit_room {
2307            live_kit.delete_room(live_kit_room).await.trace_err();
2308        }
2309    }
2310
2311    Ok(())
2312}
2313
2314fn project_left(project: &db::LeftProject, session: &Session) {
2315    for connection_id in &project.connection_ids {
2316        if project.host_user_id == session.user_id {
2317            session
2318                .peer
2319                .send(
2320                    *connection_id,
2321                    proto::UnshareProject {
2322                        project_id: project.id.to_proto(),
2323                    },
2324                )
2325                .trace_err();
2326        } else {
2327            session
2328                .peer
2329                .send(
2330                    *connection_id,
2331                    proto::RemoveProjectCollaborator {
2332                        project_id: project.id.to_proto(),
2333                        peer_id: Some(session.connection_id.into()),
2334                    },
2335                )
2336                .trace_err();
2337        }
2338    }
2339}
2340
2341pub trait ResultExt {
2342    type Ok;
2343
2344    fn trace_err(self) -> Option<Self::Ok>;
2345}
2346
2347impl<T, E> ResultExt for Result<T, E>
2348where
2349    E: std::fmt::Debug,
2350{
2351    type Ok = T;
2352
2353    fn trace_err(self) -> Option<T> {
2354        match self {
2355            Ok(value) => Some(value),
2356            Err(error) => {
2357                tracing::error!("{:?}", error);
2358                None
2359            }
2360        }
2361    }
2362}