rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::Duration,
  55};
  56use tokio::sync::{watch, Semaphore};
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_request_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_request_handler(forward_project_request::<proto::GetHover>)
 204            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 205            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 206            .add_request_handler(forward_project_request::<proto::GetReferences>)
 207            .add_request_handler(forward_project_request::<proto::SearchProject>)
 208            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 209            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 210            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 213            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 214            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 215            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 216            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 217            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 218            .add_request_handler(forward_project_request::<proto::PerformRename>)
 219            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 220            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 221            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 222            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 223            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 224            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 226            .add_message_handler(create_buffer_for_peer)
 227            .add_request_handler(update_buffer)
 228            .add_message_handler(update_buffer_file)
 229            .add_message_handler(buffer_reloaded)
 230            .add_message_handler(buffer_saved)
 231            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 232            .add_request_handler(get_users)
 233            .add_request_handler(fuzzy_search_users)
 234            .add_request_handler(request_contact)
 235            .add_request_handler(remove_contact)
 236            .add_request_handler(respond_to_contact_request)
 237            .add_request_handler(follow)
 238            .add_message_handler(unfollow)
 239            .add_message_handler(update_followers)
 240            .add_message_handler(update_diff_base)
 241            .add_request_handler(get_private_user_info);
 242
 243        Arc::new(server)
 244    }
 245
 246    pub async fn start(&self) -> Result<()> {
 247        let server_id = *self.id.lock();
 248        let app_state = self.app_state.clone();
 249        let peer = self.peer.clone();
 250        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 251        let pool = self.connection_pool.clone();
 252        let live_kit_client = self.app_state.live_kit_client.clone();
 253
 254        let span = info_span!("start server");
 255        self.executor.spawn_detached(
 256            async move {
 257                tracing::info!("waiting for cleanup timeout");
 258                timeout.await;
 259                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 260                if let Some(room_ids) = app_state
 261                    .db
 262                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 263                    .await
 264                    .trace_err()
 265                {
 266                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 267                    for room_id in room_ids {
 268                        let mut contacts_to_update = HashSet::default();
 269                        let mut canceled_calls_to_user_ids = Vec::new();
 270                        let mut live_kit_room = String::new();
 271                        let mut delete_live_kit_room = false;
 272
 273                        if let Some(mut refreshed_room) = app_state
 274                            .db
 275                            .refresh_room(room_id, server_id)
 276                            .await
 277                            .trace_err()
 278                        {
 279                            tracing::info!(
 280                                room_id = room_id.0,
 281                                new_participant_count = refreshed_room.room.participants.len(),
 282                                "refreshed room"
 283                            );
 284                            room_updated(&refreshed_room.room, &peer);
 285                            contacts_to_update
 286                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 287                            contacts_to_update
 288                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 289                            canceled_calls_to_user_ids =
 290                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 291                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 292                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 293                        }
 294
 295                        {
 296                            let pool = pool.lock();
 297                            for canceled_user_id in canceled_calls_to_user_ids {
 298                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 299                                    peer.send(
 300                                        connection_id,
 301                                        proto::CallCanceled {
 302                                            room_id: room_id.to_proto(),
 303                                        },
 304                                    )
 305                                    .trace_err();
 306                                }
 307                            }
 308                        }
 309
 310                        for user_id in contacts_to_update {
 311                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 312                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 313                            if let Some((busy, contacts)) = busy.zip(contacts) {
 314                                let pool = pool.lock();
 315                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 316                                for contact in contacts {
 317                                    if let db::Contact::Accepted {
 318                                        user_id: contact_user_id,
 319                                        ..
 320                                    } = contact
 321                                    {
 322                                        for contact_conn_id in
 323                                            pool.user_connection_ids(contact_user_id)
 324                                        {
 325                                            peer.send(
 326                                                contact_conn_id,
 327                                                proto::UpdateContacts {
 328                                                    contacts: vec![updated_contact.clone()],
 329                                                    remove_contacts: Default::default(),
 330                                                    incoming_requests: Default::default(),
 331                                                    remove_incoming_requests: Default::default(),
 332                                                    outgoing_requests: Default::default(),
 333                                                    remove_outgoing_requests: Default::default(),
 334                                                },
 335                                            )
 336                                            .trace_err();
 337                                        }
 338                                    }
 339                                }
 340                            }
 341                        }
 342
 343                        if let Some(live_kit) = live_kit_client.as_ref() {
 344                            if delete_live_kit_room {
 345                                live_kit.delete_room(live_kit_room).await.trace_err();
 346                            }
 347                        }
 348                    }
 349                }
 350
 351                app_state
 352                    .db
 353                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 354                    .await
 355                    .trace_err();
 356            }
 357            .instrument(span),
 358        );
 359        Ok(())
 360    }
 361
 362    pub fn teardown(&self) {
 363        self.peer.teardown();
 364        self.connection_pool.lock().reset();
 365        let _ = self.teardown.send(());
 366    }
 367
 368    #[cfg(test)]
 369    pub fn reset(&self, id: ServerId) {
 370        self.teardown();
 371        *self.id.lock() = id;
 372        self.peer.reset(id.0 as u32);
 373    }
 374
 375    #[cfg(test)]
 376    pub fn id(&self) -> ServerId {
 377        *self.id.lock()
 378    }
 379
 380    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 381    where
 382        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 383        Fut: 'static + Send + Future<Output = Result<()>>,
 384        M: EnvelopedMessage,
 385    {
 386        let prev_handler = self.handlers.insert(
 387            TypeId::of::<M>(),
 388            Box::new(move |envelope, session| {
 389                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 390                let span = info_span!(
 391                    "handle message",
 392                    payload_type = envelope.payload_type_name()
 393                );
 394                span.in_scope(|| {
 395                    tracing::info!(
 396                        payload_type = envelope.payload_type_name(),
 397                        "message received"
 398                    );
 399                });
 400                let future = (handler)(*envelope, session);
 401                async move {
 402                    if let Err(error) = future.await {
 403                        tracing::error!(%error, "error handling message");
 404                    }
 405                }
 406                .instrument(span)
 407                .boxed()
 408            }),
 409        );
 410        if prev_handler.is_some() {
 411            panic!("registered a handler for the same message twice");
 412        }
 413        self
 414    }
 415
 416    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 417    where
 418        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 419        Fut: 'static + Send + Future<Output = Result<()>>,
 420        M: EnvelopedMessage,
 421    {
 422        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 423        self
 424    }
 425
 426    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 427    where
 428        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 429        Fut: Send + Future<Output = Result<()>>,
 430        M: RequestMessage,
 431    {
 432        let handler = Arc::new(handler);
 433        self.add_handler(move |envelope, session| {
 434            let receipt = envelope.receipt();
 435            let handler = handler.clone();
 436            async move {
 437                let peer = session.peer.clone();
 438                let responded = Arc::new(AtomicBool::default());
 439                let response = Response {
 440                    peer: peer.clone(),
 441                    responded: responded.clone(),
 442                    receipt,
 443                };
 444                match (handler)(envelope.payload, response, session).await {
 445                    Ok(()) => {
 446                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 447                            Ok(())
 448                        } else {
 449                            Err(anyhow!("handler did not send a response"))?
 450                        }
 451                    }
 452                    Err(error) => {
 453                        peer.respond_with_error(
 454                            receipt,
 455                            proto::Error {
 456                                message: error.to_string(),
 457                            },
 458                        )?;
 459                        Err(error)
 460                    }
 461                }
 462            }
 463        })
 464    }
 465
 466    pub fn handle_connection(
 467        self: &Arc<Self>,
 468        connection: Connection,
 469        address: String,
 470        user: User,
 471        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 472        executor: Executor,
 473    ) -> impl Future<Output = Result<()>> {
 474        let this = self.clone();
 475        let user_id = user.id;
 476        let login = user.github_login;
 477        let span = info_span!("handle connection", %user_id, %login, %address);
 478        let mut teardown = self.teardown.subscribe();
 479        async move {
 480            let (connection_id, handle_io, mut incoming_rx) = this
 481                .peer
 482                .add_connection(connection, {
 483                    let executor = executor.clone();
 484                    move |duration| executor.sleep(duration)
 485                });
 486
 487            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 488            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 489            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 490
 491            if let Some(send_connection_id) = send_connection_id.take() {
 492                let _ = send_connection_id.send(connection_id);
 493            }
 494
 495            if !user.connected_once {
 496                this.peer.send(connection_id, proto::ShowContacts {})?;
 497                this.app_state.db.set_user_connected_once(user_id, true).await?;
 498            }
 499
 500            let (contacts, invite_code) = future::try_join(
 501                this.app_state.db.get_contacts(user_id),
 502                this.app_state.db.get_invite_code_for_user(user_id)
 503            ).await?;
 504
 505            {
 506                let mut pool = this.connection_pool.lock();
 507                pool.add_connection(connection_id, user_id, user.admin);
 508                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 509
 510                if let Some((code, count)) = invite_code {
 511                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 512                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 513                        count: count as u32,
 514                    })?;
 515                }
 516            }
 517
 518            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 519                this.peer.send(connection_id, incoming_call)?;
 520            }
 521
 522            let session = Session {
 523                user_id,
 524                connection_id,
 525                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 526                peer: this.peer.clone(),
 527                connection_pool: this.connection_pool.clone(),
 528                live_kit_client: this.app_state.live_kit_client.clone(),
 529                executor: executor.clone(),
 530            };
 531            update_user_contacts(user_id, &session).await?;
 532
 533            let handle_io = handle_io.fuse();
 534            futures::pin_mut!(handle_io);
 535
 536            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 537            // This prevents deadlocks when e.g., client A performs a request to client B and
 538            // client B performs a request to client A. If both clients stop processing further
 539            // messages until their respective request completes, they won't have a chance to
 540            // respond to the other client's request and cause a deadlock.
 541            //
 542            // This arrangement ensures we will attempt to process earlier messages first, but fall
 543            // back to processing messages arrived later in the spirit of making progress.
 544            let mut foreground_message_handlers = FuturesUnordered::new();
 545            let concurrent_handlers = Arc::new(Semaphore::new(256));
 546            loop {
 547                let next_message = async {
 548                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 549                    let message = incoming_rx.next().await;
 550                    (permit, message)
 551                }.fuse();
 552                futures::pin_mut!(next_message);
 553                futures::select_biased! {
 554                    _ = teardown.changed().fuse() => return Ok(()),
 555                    result = handle_io => {
 556                        if let Err(error) = result {
 557                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 558                        }
 559                        break;
 560                    }
 561                    _ = foreground_message_handlers.next() => {}
 562                    next_message = next_message => {
 563                        let (permit, message) = next_message;
 564                        if let Some(message) = message {
 565                            let type_name = message.payload_type_name();
 566                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 567                            let span_enter = span.enter();
 568                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 569                                let is_background = message.is_background();
 570                                let handle_message = (handler)(message, session.clone());
 571                                drop(span_enter);
 572
 573                                let handle_message = async move {
 574                                    handle_message.await;
 575                                    drop(permit);
 576                                }.instrument(span);
 577                                if is_background {
 578                                    executor.spawn_detached(handle_message);
 579                                } else {
 580                                    foreground_message_handlers.push(handle_message);
 581                                }
 582                            } else {
 583                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 584                            }
 585                        } else {
 586                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 587                            break;
 588                        }
 589                    }
 590                }
 591            }
 592
 593            drop(foreground_message_handlers);
 594            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 595            if let Err(error) = connection_lost(session, teardown, executor).await {
 596                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 597            }
 598
 599            Ok(())
 600        }.instrument(span)
 601    }
 602
 603    pub async fn invite_code_redeemed(
 604        self: &Arc<Self>,
 605        inviter_id: UserId,
 606        invitee_id: UserId,
 607    ) -> Result<()> {
 608        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 609            if let Some(code) = &user.invite_code {
 610                let pool = self.connection_pool.lock();
 611                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 612                for connection_id in pool.user_connection_ids(inviter_id) {
 613                    self.peer.send(
 614                        connection_id,
 615                        proto::UpdateContacts {
 616                            contacts: vec![invitee_contact.clone()],
 617                            ..Default::default()
 618                        },
 619                    )?;
 620                    self.peer.send(
 621                        connection_id,
 622                        proto::UpdateInviteInfo {
 623                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 624                            count: user.invite_count as u32,
 625                        },
 626                    )?;
 627                }
 628            }
 629        }
 630        Ok(())
 631    }
 632
 633    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 634        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 635            if let Some(invite_code) = &user.invite_code {
 636                let pool = self.connection_pool.lock();
 637                for connection_id in pool.user_connection_ids(user_id) {
 638                    self.peer.send(
 639                        connection_id,
 640                        proto::UpdateInviteInfo {
 641                            url: format!(
 642                                "{}{}",
 643                                self.app_state.config.invite_link_prefix, invite_code
 644                            ),
 645                            count: user.invite_count as u32,
 646                        },
 647                    )?;
 648                }
 649            }
 650        }
 651        Ok(())
 652    }
 653
 654    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 655        ServerSnapshot {
 656            connection_pool: ConnectionPoolGuard {
 657                guard: self.connection_pool.lock(),
 658                _not_send: PhantomData,
 659            },
 660            peer: &self.peer,
 661        }
 662    }
 663}
 664
 665impl<'a> Deref for ConnectionPoolGuard<'a> {
 666    type Target = ConnectionPool;
 667
 668    fn deref(&self) -> &Self::Target {
 669        &*self.guard
 670    }
 671}
 672
 673impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 674    fn deref_mut(&mut self) -> &mut Self::Target {
 675        &mut *self.guard
 676    }
 677}
 678
 679impl<'a> Drop for ConnectionPoolGuard<'a> {
 680    fn drop(&mut self) {
 681        #[cfg(test)]
 682        self.check_invariants();
 683    }
 684}
 685
 686fn broadcast<F>(
 687    sender_id: Option<ConnectionId>,
 688    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 689    mut f: F,
 690) where
 691    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 692{
 693    for receiver_id in receiver_ids {
 694        if Some(receiver_id) != sender_id {
 695            if let Err(error) = f(receiver_id) {
 696                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 697            }
 698        }
 699    }
 700}
 701
 702lazy_static! {
 703    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 704}
 705
 706pub struct ProtocolVersion(u32);
 707
 708impl Header for ProtocolVersion {
 709    fn name() -> &'static HeaderName {
 710        &ZED_PROTOCOL_VERSION
 711    }
 712
 713    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 714    where
 715        Self: Sized,
 716        I: Iterator<Item = &'i axum::http::HeaderValue>,
 717    {
 718        let version = values
 719            .next()
 720            .ok_or_else(axum::headers::Error::invalid)?
 721            .to_str()
 722            .map_err(|_| axum::headers::Error::invalid())?
 723            .parse()
 724            .map_err(|_| axum::headers::Error::invalid())?;
 725        Ok(Self(version))
 726    }
 727
 728    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 729        values.extend([self.0.to_string().parse().unwrap()]);
 730    }
 731}
 732
 733pub fn routes(server: Arc<Server>) -> Router<Body> {
 734    Router::new()
 735        .route("/rpc", get(handle_websocket_request))
 736        .layer(
 737            ServiceBuilder::new()
 738                .layer(Extension(server.app_state.clone()))
 739                .layer(middleware::from_fn(auth::validate_header)),
 740        )
 741        .route("/metrics", get(handle_metrics))
 742        .layer(Extension(server))
 743}
 744
 745pub async fn handle_websocket_request(
 746    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 747    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 748    Extension(server): Extension<Arc<Server>>,
 749    Extension(user): Extension<User>,
 750    ws: WebSocketUpgrade,
 751) -> axum::response::Response {
 752    if protocol_version != rpc::PROTOCOL_VERSION {
 753        return (
 754            StatusCode::UPGRADE_REQUIRED,
 755            "client must be upgraded".to_string(),
 756        )
 757            .into_response();
 758    }
 759    let socket_address = socket_address.to_string();
 760    ws.on_upgrade(move |socket| {
 761        use util::ResultExt;
 762        let socket = socket
 763            .map_ok(to_tungstenite_message)
 764            .err_into()
 765            .with(|message| async move { Ok(to_axum_message(message)) });
 766        let connection = Connection::new(Box::pin(socket));
 767        async move {
 768            server
 769                .handle_connection(connection, socket_address, user, None, Executor::Production)
 770                .await
 771                .log_err();
 772        }
 773    })
 774}
 775
 776pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 777    let connections = server
 778        .connection_pool
 779        .lock()
 780        .connections()
 781        .filter(|connection| !connection.admin)
 782        .count();
 783
 784    METRIC_CONNECTIONS.set(connections as _);
 785
 786    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 787    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 788
 789    let encoder = prometheus::TextEncoder::new();
 790    let metric_families = prometheus::gather();
 791    let encoded_metrics = encoder
 792        .encode_to_string(&metric_families)
 793        .map_err(|err| anyhow!("{}", err))?;
 794    Ok(encoded_metrics)
 795}
 796
 797#[instrument(err, skip(executor))]
 798async fn connection_lost(
 799    session: Session,
 800    mut teardown: watch::Receiver<()>,
 801    executor: Executor,
 802) -> Result<()> {
 803    session.peer.disconnect(session.connection_id);
 804    session
 805        .connection_pool()
 806        .await
 807        .remove_connection(session.connection_id)?;
 808
 809    session
 810        .db()
 811        .await
 812        .connection_lost(session.connection_id)
 813        .await
 814        .trace_err();
 815
 816    futures::select_biased! {
 817        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 818            leave_room_for_session(&session).await.trace_err();
 819
 820            if !session
 821                .connection_pool()
 822                .await
 823                .is_user_online(session.user_id)
 824            {
 825                let db = session.db().await;
 826                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 827                    room_updated(&room, &session.peer);
 828                }
 829            }
 830            update_user_contacts(session.user_id, &session).await?;
 831        }
 832        _ = teardown.changed().fuse() => {}
 833    }
 834
 835    Ok(())
 836}
 837
 838async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 839    response.send(proto::Ack {})?;
 840    Ok(())
 841}
 842
 843async fn create_room(
 844    _request: proto::CreateRoom,
 845    response: Response<proto::CreateRoom>,
 846    session: Session,
 847) -> Result<()> {
 848    let live_kit_room = nanoid::nanoid!(30);
 849    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 850        if let Some(_) = live_kit
 851            .create_room(live_kit_room.clone())
 852            .await
 853            .trace_err()
 854        {
 855            if let Some(token) = live_kit
 856                .room_token(&live_kit_room, &session.user_id.to_string())
 857                .trace_err()
 858            {
 859                Some(proto::LiveKitConnectionInfo {
 860                    server_url: live_kit.url().into(),
 861                    token,
 862                })
 863            } else {
 864                None
 865            }
 866        } else {
 867            None
 868        }
 869    } else {
 870        None
 871    };
 872
 873    {
 874        let room = session
 875            .db()
 876            .await
 877            .create_room(session.user_id, session.connection_id, &live_kit_room)
 878            .await?;
 879
 880        response.send(proto::CreateRoomResponse {
 881            room: Some(room.clone()),
 882            live_kit_connection_info,
 883        })?;
 884    }
 885
 886    update_user_contacts(session.user_id, &session).await?;
 887    Ok(())
 888}
 889
 890async fn join_room(
 891    request: proto::JoinRoom,
 892    response: Response<proto::JoinRoom>,
 893    session: Session,
 894) -> Result<()> {
 895    let room_id = RoomId::from_proto(request.id);
 896    let room = {
 897        let room = session
 898            .db()
 899            .await
 900            .join_room(room_id, session.user_id, session.connection_id)
 901            .await?;
 902        room_updated(&room, &session.peer);
 903        room.clone()
 904    };
 905
 906    for connection_id in session
 907        .connection_pool()
 908        .await
 909        .user_connection_ids(session.user_id)
 910    {
 911        session
 912            .peer
 913            .send(
 914                connection_id,
 915                proto::CallCanceled {
 916                    room_id: room_id.to_proto(),
 917                },
 918            )
 919            .trace_err();
 920    }
 921
 922    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 923        if let Some(token) = live_kit
 924            .room_token(&room.live_kit_room, &session.user_id.to_string())
 925            .trace_err()
 926        {
 927            Some(proto::LiveKitConnectionInfo {
 928                server_url: live_kit.url().into(),
 929                token,
 930            })
 931        } else {
 932            None
 933        }
 934    } else {
 935        None
 936    };
 937
 938    response.send(proto::JoinRoomResponse {
 939        room: Some(room),
 940        live_kit_connection_info,
 941    })?;
 942
 943    update_user_contacts(session.user_id, &session).await?;
 944    Ok(())
 945}
 946
 947async fn rejoin_room(
 948    request: proto::RejoinRoom,
 949    response: Response<proto::RejoinRoom>,
 950    session: Session,
 951) -> Result<()> {
 952    {
 953        let mut rejoined_room = session
 954            .db()
 955            .await
 956            .rejoin_room(request, session.user_id, session.connection_id)
 957            .await?;
 958
 959        response.send(proto::RejoinRoomResponse {
 960            room: Some(rejoined_room.room.clone()),
 961            reshared_projects: rejoined_room
 962                .reshared_projects
 963                .iter()
 964                .map(|project| proto::ResharedProject {
 965                    id: project.id.to_proto(),
 966                    collaborators: project
 967                        .collaborators
 968                        .iter()
 969                        .map(|collaborator| collaborator.to_proto())
 970                        .collect(),
 971                })
 972                .collect(),
 973            rejoined_projects: rejoined_room
 974                .rejoined_projects
 975                .iter()
 976                .map(|rejoined_project| proto::RejoinedProject {
 977                    id: rejoined_project.id.to_proto(),
 978                    worktrees: rejoined_project
 979                        .worktrees
 980                        .iter()
 981                        .map(|worktree| proto::WorktreeMetadata {
 982                            id: worktree.id,
 983                            root_name: worktree.root_name.clone(),
 984                            visible: worktree.visible,
 985                            abs_path: worktree.abs_path.clone(),
 986                        })
 987                        .collect(),
 988                    collaborators: rejoined_project
 989                        .collaborators
 990                        .iter()
 991                        .map(|collaborator| collaborator.to_proto())
 992                        .collect(),
 993                    language_servers: rejoined_project.language_servers.clone(),
 994                })
 995                .collect(),
 996        })?;
 997        room_updated(&rejoined_room.room, &session.peer);
 998
 999        for project in &rejoined_room.reshared_projects {
1000            for collaborator in &project.collaborators {
1001                session
1002                    .peer
1003                    .send(
1004                        collaborator.connection_id,
1005                        proto::UpdateProjectCollaborator {
1006                            project_id: project.id.to_proto(),
1007                            old_peer_id: Some(project.old_connection_id.into()),
1008                            new_peer_id: Some(session.connection_id.into()),
1009                        },
1010                    )
1011                    .trace_err();
1012            }
1013
1014            broadcast(
1015                Some(session.connection_id),
1016                project
1017                    .collaborators
1018                    .iter()
1019                    .map(|collaborator| collaborator.connection_id),
1020                |connection_id| {
1021                    session.peer.forward_send(
1022                        session.connection_id,
1023                        connection_id,
1024                        proto::UpdateProject {
1025                            project_id: project.id.to_proto(),
1026                            worktrees: project.worktrees.clone(),
1027                        },
1028                    )
1029                },
1030            );
1031        }
1032
1033        for project in &rejoined_room.rejoined_projects {
1034            for collaborator in &project.collaborators {
1035                session
1036                    .peer
1037                    .send(
1038                        collaborator.connection_id,
1039                        proto::UpdateProjectCollaborator {
1040                            project_id: project.id.to_proto(),
1041                            old_peer_id: Some(project.old_connection_id.into()),
1042                            new_peer_id: Some(session.connection_id.into()),
1043                        },
1044                    )
1045                    .trace_err();
1046            }
1047        }
1048
1049        for project in &mut rejoined_room.rejoined_projects {
1050            for worktree in mem::take(&mut project.worktrees) {
1051                #[cfg(any(test, feature = "test-support"))]
1052                const MAX_CHUNK_SIZE: usize = 2;
1053                #[cfg(not(any(test, feature = "test-support")))]
1054                const MAX_CHUNK_SIZE: usize = 256;
1055
1056                // Stream this worktree's entries.
1057                let message = proto::UpdateWorktree {
1058                    project_id: project.id.to_proto(),
1059                    worktree_id: worktree.id,
1060                    abs_path: worktree.abs_path.clone(),
1061                    root_name: worktree.root_name,
1062                    updated_entries: worktree.updated_entries,
1063                    removed_entries: worktree.removed_entries,
1064                    scan_id: worktree.scan_id,
1065                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1066                    //TODO repo
1067                    updated_repositories: vec![],
1068                    removed_repositories: vec![],
1069                };
1070                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1071                    session.peer.send(session.connection_id, update.clone())?;
1072                }
1073
1074                // Stream this worktree's diagnostics.
1075                for summary in worktree.diagnostic_summaries {
1076                    session.peer.send(
1077                        session.connection_id,
1078                        proto::UpdateDiagnosticSummary {
1079                            project_id: project.id.to_proto(),
1080                            worktree_id: worktree.id,
1081                            summary: Some(summary),
1082                        },
1083                    )?;
1084                }
1085            }
1086
1087            for language_server in &project.language_servers {
1088                session.peer.send(
1089                    session.connection_id,
1090                    proto::UpdateLanguageServer {
1091                        project_id: project.id.to_proto(),
1092                        language_server_id: language_server.id,
1093                        variant: Some(
1094                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1095                                proto::LspDiskBasedDiagnosticsUpdated {},
1096                            ),
1097                        ),
1098                    },
1099                )?;
1100            }
1101        }
1102    }
1103
1104    update_user_contacts(session.user_id, &session).await?;
1105    Ok(())
1106}
1107
1108async fn leave_room(
1109    _: proto::LeaveRoom,
1110    response: Response<proto::LeaveRoom>,
1111    session: Session,
1112) -> Result<()> {
1113    leave_room_for_session(&session).await?;
1114    response.send(proto::Ack {})?;
1115    Ok(())
1116}
1117
1118async fn call(
1119    request: proto::Call,
1120    response: Response<proto::Call>,
1121    session: Session,
1122) -> Result<()> {
1123    let room_id = RoomId::from_proto(request.room_id);
1124    let calling_user_id = session.user_id;
1125    let calling_connection_id = session.connection_id;
1126    let called_user_id = UserId::from_proto(request.called_user_id);
1127    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1128    if !session
1129        .db()
1130        .await
1131        .has_contact(calling_user_id, called_user_id)
1132        .await?
1133    {
1134        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1135    }
1136
1137    let incoming_call = {
1138        let (room, incoming_call) = &mut *session
1139            .db()
1140            .await
1141            .call(
1142                room_id,
1143                calling_user_id,
1144                calling_connection_id,
1145                called_user_id,
1146                initial_project_id,
1147            )
1148            .await?;
1149        room_updated(&room, &session.peer);
1150        mem::take(incoming_call)
1151    };
1152    update_user_contacts(called_user_id, &session).await?;
1153
1154    let mut calls = session
1155        .connection_pool()
1156        .await
1157        .user_connection_ids(called_user_id)
1158        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1159        .collect::<FuturesUnordered<_>>();
1160
1161    while let Some(call_response) = calls.next().await {
1162        match call_response.as_ref() {
1163            Ok(_) => {
1164                response.send(proto::Ack {})?;
1165                return Ok(());
1166            }
1167            Err(_) => {
1168                call_response.trace_err();
1169            }
1170        }
1171    }
1172
1173    {
1174        let room = session
1175            .db()
1176            .await
1177            .call_failed(room_id, called_user_id)
1178            .await?;
1179        room_updated(&room, &session.peer);
1180    }
1181    update_user_contacts(called_user_id, &session).await?;
1182
1183    Err(anyhow!("failed to ring user"))?
1184}
1185
1186async fn cancel_call(
1187    request: proto::CancelCall,
1188    response: Response<proto::CancelCall>,
1189    session: Session,
1190) -> Result<()> {
1191    let called_user_id = UserId::from_proto(request.called_user_id);
1192    let room_id = RoomId::from_proto(request.room_id);
1193    {
1194        let room = session
1195            .db()
1196            .await
1197            .cancel_call(room_id, session.connection_id, called_user_id)
1198            .await?;
1199        room_updated(&room, &session.peer);
1200    }
1201
1202    for connection_id in session
1203        .connection_pool()
1204        .await
1205        .user_connection_ids(called_user_id)
1206    {
1207        session
1208            .peer
1209            .send(
1210                connection_id,
1211                proto::CallCanceled {
1212                    room_id: room_id.to_proto(),
1213                },
1214            )
1215            .trace_err();
1216    }
1217    response.send(proto::Ack {})?;
1218
1219    update_user_contacts(called_user_id, &session).await?;
1220    Ok(())
1221}
1222
1223async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1224    let room_id = RoomId::from_proto(message.room_id);
1225    {
1226        let room = session
1227            .db()
1228            .await
1229            .decline_call(Some(room_id), session.user_id)
1230            .await?
1231            .ok_or_else(|| anyhow!("failed to decline call"))?;
1232        room_updated(&room, &session.peer);
1233    }
1234
1235    for connection_id in session
1236        .connection_pool()
1237        .await
1238        .user_connection_ids(session.user_id)
1239    {
1240        session
1241            .peer
1242            .send(
1243                connection_id,
1244                proto::CallCanceled {
1245                    room_id: room_id.to_proto(),
1246                },
1247            )
1248            .trace_err();
1249    }
1250    update_user_contacts(session.user_id, &session).await?;
1251    Ok(())
1252}
1253
1254async fn update_participant_location(
1255    request: proto::UpdateParticipantLocation,
1256    response: Response<proto::UpdateParticipantLocation>,
1257    session: Session,
1258) -> Result<()> {
1259    let room_id = RoomId::from_proto(request.room_id);
1260    let location = request
1261        .location
1262        .ok_or_else(|| anyhow!("invalid location"))?;
1263    let room = session
1264        .db()
1265        .await
1266        .update_room_participant_location(room_id, session.connection_id, location)
1267        .await?;
1268    room_updated(&room, &session.peer);
1269    response.send(proto::Ack {})?;
1270    Ok(())
1271}
1272
1273async fn share_project(
1274    request: proto::ShareProject,
1275    response: Response<proto::ShareProject>,
1276    session: Session,
1277) -> Result<()> {
1278    let (project_id, room) = &*session
1279        .db()
1280        .await
1281        .share_project(
1282            RoomId::from_proto(request.room_id),
1283            session.connection_id,
1284            &request.worktrees,
1285        )
1286        .await?;
1287    response.send(proto::ShareProjectResponse {
1288        project_id: project_id.to_proto(),
1289    })?;
1290    room_updated(&room, &session.peer);
1291
1292    Ok(())
1293}
1294
1295async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1296    let project_id = ProjectId::from_proto(message.project_id);
1297
1298    let (room, guest_connection_ids) = &*session
1299        .db()
1300        .await
1301        .unshare_project(project_id, session.connection_id)
1302        .await?;
1303
1304    broadcast(
1305        Some(session.connection_id),
1306        guest_connection_ids.iter().copied(),
1307        |conn_id| session.peer.send(conn_id, message.clone()),
1308    );
1309    room_updated(&room, &session.peer);
1310
1311    Ok(())
1312}
1313
1314async fn join_project(
1315    request: proto::JoinProject,
1316    response: Response<proto::JoinProject>,
1317    session: Session,
1318) -> Result<()> {
1319    let project_id = ProjectId::from_proto(request.project_id);
1320    let guest_user_id = session.user_id;
1321
1322    tracing::info!(%project_id, "join project");
1323
1324    let (project, replica_id) = &mut *session
1325        .db()
1326        .await
1327        .join_project(project_id, session.connection_id)
1328        .await?;
1329
1330    let collaborators = project
1331        .collaborators
1332        .iter()
1333        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1334        .map(|collaborator| collaborator.to_proto())
1335        .collect::<Vec<_>>();
1336
1337    let worktrees = project
1338        .worktrees
1339        .iter()
1340        .map(|(id, worktree)| proto::WorktreeMetadata {
1341            id: *id,
1342            root_name: worktree.root_name.clone(),
1343            visible: worktree.visible,
1344            abs_path: worktree.abs_path.clone(),
1345        })
1346        .collect::<Vec<_>>();
1347
1348    for collaborator in &collaborators {
1349        session
1350            .peer
1351            .send(
1352                collaborator.peer_id.unwrap().into(),
1353                proto::AddProjectCollaborator {
1354                    project_id: project_id.to_proto(),
1355                    collaborator: Some(proto::Collaborator {
1356                        peer_id: Some(session.connection_id.into()),
1357                        replica_id: replica_id.0 as u32,
1358                        user_id: guest_user_id.to_proto(),
1359                    }),
1360                },
1361            )
1362            .trace_err();
1363    }
1364
1365    // First, we send the metadata associated with each worktree.
1366    response.send(proto::JoinProjectResponse {
1367        worktrees: worktrees.clone(),
1368        replica_id: replica_id.0 as u32,
1369        collaborators: collaborators.clone(),
1370        language_servers: project.language_servers.clone(),
1371    })?;
1372
1373    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1374        #[cfg(any(test, feature = "test-support"))]
1375        const MAX_CHUNK_SIZE: usize = 2;
1376        #[cfg(not(any(test, feature = "test-support")))]
1377        const MAX_CHUNK_SIZE: usize = 256;
1378
1379        // Stream this worktree's entries.
1380        let message = proto::UpdateWorktree {
1381            project_id: project_id.to_proto(),
1382            worktree_id,
1383            abs_path: worktree.abs_path.clone(),
1384            root_name: worktree.root_name,
1385            updated_entries: worktree.entries,
1386            removed_entries: Default::default(),
1387            scan_id: worktree.scan_id,
1388            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1389            updated_repositories: worktree.repository_entries,
1390            removed_repositories: Default::default(),
1391        };
1392        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1393            session.peer.send(session.connection_id, update.clone())?;
1394        }
1395
1396        // Stream this worktree's diagnostics.
1397        for summary in worktree.diagnostic_summaries {
1398            session.peer.send(
1399                session.connection_id,
1400                proto::UpdateDiagnosticSummary {
1401                    project_id: project_id.to_proto(),
1402                    worktree_id: worktree.id,
1403                    summary: Some(summary),
1404                },
1405            )?;
1406        }
1407    }
1408
1409    for language_server in &project.language_servers {
1410        session.peer.send(
1411            session.connection_id,
1412            proto::UpdateLanguageServer {
1413                project_id: project_id.to_proto(),
1414                language_server_id: language_server.id,
1415                variant: Some(
1416                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1417                        proto::LspDiskBasedDiagnosticsUpdated {},
1418                    ),
1419                ),
1420            },
1421        )?;
1422    }
1423
1424    Ok(())
1425}
1426
1427async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1428    let sender_id = session.connection_id;
1429    let project_id = ProjectId::from_proto(request.project_id);
1430
1431    let (room, project) = &*session
1432        .db()
1433        .await
1434        .leave_project(project_id, sender_id)
1435        .await?;
1436    tracing::info!(
1437        %project_id,
1438        host_user_id = %project.host_user_id,
1439        host_connection_id = %project.host_connection_id,
1440        "leave project"
1441    );
1442
1443    project_left(&project, &session);
1444    room_updated(&room, &session.peer);
1445
1446    Ok(())
1447}
1448
1449async fn update_project(
1450    request: proto::UpdateProject,
1451    response: Response<proto::UpdateProject>,
1452    session: Session,
1453) -> Result<()> {
1454    let project_id = ProjectId::from_proto(request.project_id);
1455    let (room, guest_connection_ids) = &*session
1456        .db()
1457        .await
1458        .update_project(project_id, session.connection_id, &request.worktrees)
1459        .await?;
1460    broadcast(
1461        Some(session.connection_id),
1462        guest_connection_ids.iter().copied(),
1463        |connection_id| {
1464            session
1465                .peer
1466                .forward_send(session.connection_id, connection_id, request.clone())
1467        },
1468    );
1469    room_updated(&room, &session.peer);
1470    response.send(proto::Ack {})?;
1471
1472    Ok(())
1473}
1474
1475async fn update_worktree(
1476    request: proto::UpdateWorktree,
1477    response: Response<proto::UpdateWorktree>,
1478    session: Session,
1479) -> Result<()> {
1480    let guest_connection_ids = session
1481        .db()
1482        .await
1483        .update_worktree(&request, session.connection_id)
1484        .await?;
1485
1486    broadcast(
1487        Some(session.connection_id),
1488        guest_connection_ids.iter().copied(),
1489        |connection_id| {
1490            session
1491                .peer
1492                .forward_send(session.connection_id, connection_id, request.clone())
1493        },
1494    );
1495    response.send(proto::Ack {})?;
1496    Ok(())
1497}
1498
1499async fn update_diagnostic_summary(
1500    message: proto::UpdateDiagnosticSummary,
1501    session: Session,
1502) -> Result<()> {
1503    let guest_connection_ids = session
1504        .db()
1505        .await
1506        .update_diagnostic_summary(&message, session.connection_id)
1507        .await?;
1508
1509    broadcast(
1510        Some(session.connection_id),
1511        guest_connection_ids.iter().copied(),
1512        |connection_id| {
1513            session
1514                .peer
1515                .forward_send(session.connection_id, connection_id, message.clone())
1516        },
1517    );
1518
1519    Ok(())
1520}
1521
1522async fn start_language_server(
1523    request: proto::StartLanguageServer,
1524    session: Session,
1525) -> Result<()> {
1526    let guest_connection_ids = session
1527        .db()
1528        .await
1529        .start_language_server(&request, session.connection_id)
1530        .await?;
1531
1532    broadcast(
1533        Some(session.connection_id),
1534        guest_connection_ids.iter().copied(),
1535        |connection_id| {
1536            session
1537                .peer
1538                .forward_send(session.connection_id, connection_id, request.clone())
1539        },
1540    );
1541    Ok(())
1542}
1543
1544async fn update_language_server(
1545    request: proto::UpdateLanguageServer,
1546    session: Session,
1547) -> Result<()> {
1548    session.executor.record_backtrace();
1549    let project_id = ProjectId::from_proto(request.project_id);
1550    let project_connection_ids = session
1551        .db()
1552        .await
1553        .project_connection_ids(project_id, session.connection_id)
1554        .await?;
1555    broadcast(
1556        Some(session.connection_id),
1557        project_connection_ids.iter().copied(),
1558        |connection_id| {
1559            session
1560                .peer
1561                .forward_send(session.connection_id, connection_id, request.clone())
1562        },
1563    );
1564    Ok(())
1565}
1566
1567async fn forward_project_request<T>(
1568    request: T,
1569    response: Response<T>,
1570    session: Session,
1571) -> Result<()>
1572where
1573    T: EntityMessage + RequestMessage,
1574{
1575    session.executor.record_backtrace();
1576    let project_id = ProjectId::from_proto(request.remote_entity_id());
1577    let host_connection_id = {
1578        let collaborators = session
1579            .db()
1580            .await
1581            .project_collaborators(project_id, session.connection_id)
1582            .await?;
1583        collaborators
1584            .iter()
1585            .find(|collaborator| collaborator.is_host)
1586            .ok_or_else(|| anyhow!("host not found"))?
1587            .connection_id
1588    };
1589
1590    let payload = session
1591        .peer
1592        .forward_request(session.connection_id, host_connection_id, request)
1593        .await?;
1594
1595    response.send(payload)?;
1596    Ok(())
1597}
1598
1599async fn create_buffer_for_peer(
1600    request: proto::CreateBufferForPeer,
1601    session: Session,
1602) -> Result<()> {
1603    session.executor.record_backtrace();
1604    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1605    session
1606        .peer
1607        .forward_send(session.connection_id, peer_id.into(), request)?;
1608    Ok(())
1609}
1610
1611async fn update_buffer(
1612    request: proto::UpdateBuffer,
1613    response: Response<proto::UpdateBuffer>,
1614    session: Session,
1615) -> Result<()> {
1616    session.executor.record_backtrace();
1617    let project_id = ProjectId::from_proto(request.project_id);
1618    let mut guest_connection_ids;
1619    let mut host_connection_id = None;
1620    {
1621        let collaborators = session
1622            .db()
1623            .await
1624            .project_collaborators(project_id, session.connection_id)
1625            .await?;
1626        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1627        for collaborator in collaborators.iter() {
1628            if collaborator.is_host {
1629                host_connection_id = Some(collaborator.connection_id);
1630            } else {
1631                guest_connection_ids.push(collaborator.connection_id);
1632            }
1633        }
1634    }
1635    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1636
1637    session.executor.record_backtrace();
1638    broadcast(
1639        Some(session.connection_id),
1640        guest_connection_ids,
1641        |connection_id| {
1642            session
1643                .peer
1644                .forward_send(session.connection_id, connection_id, request.clone())
1645        },
1646    );
1647    if host_connection_id != session.connection_id {
1648        session
1649            .peer
1650            .forward_request(session.connection_id, host_connection_id, request.clone())
1651            .await?;
1652    }
1653
1654    response.send(proto::Ack {})?;
1655    Ok(())
1656}
1657
1658async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1659    let project_id = ProjectId::from_proto(request.project_id);
1660    let project_connection_ids = session
1661        .db()
1662        .await
1663        .project_connection_ids(project_id, session.connection_id)
1664        .await?;
1665
1666    broadcast(
1667        Some(session.connection_id),
1668        project_connection_ids.iter().copied(),
1669        |connection_id| {
1670            session
1671                .peer
1672                .forward_send(session.connection_id, connection_id, request.clone())
1673        },
1674    );
1675    Ok(())
1676}
1677
1678async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1679    let project_id = ProjectId::from_proto(request.project_id);
1680    let project_connection_ids = session
1681        .db()
1682        .await
1683        .project_connection_ids(project_id, session.connection_id)
1684        .await?;
1685    broadcast(
1686        Some(session.connection_id),
1687        project_connection_ids.iter().copied(),
1688        |connection_id| {
1689            session
1690                .peer
1691                .forward_send(session.connection_id, connection_id, request.clone())
1692        },
1693    );
1694    Ok(())
1695}
1696
1697async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1698    let project_id = ProjectId::from_proto(request.project_id);
1699    let project_connection_ids = session
1700        .db()
1701        .await
1702        .project_connection_ids(project_id, session.connection_id)
1703        .await?;
1704    broadcast(
1705        Some(session.connection_id),
1706        project_connection_ids.iter().copied(),
1707        |connection_id| {
1708            session
1709                .peer
1710                .forward_send(session.connection_id, connection_id, request.clone())
1711        },
1712    );
1713    Ok(())
1714}
1715
1716async fn follow(
1717    request: proto::Follow,
1718    response: Response<proto::Follow>,
1719    session: Session,
1720) -> Result<()> {
1721    let project_id = ProjectId::from_proto(request.project_id);
1722    let leader_id = request
1723        .leader_id
1724        .ok_or_else(|| anyhow!("invalid leader id"))?
1725        .into();
1726    let follower_id = session.connection_id;
1727
1728    {
1729        let project_connection_ids = session
1730            .db()
1731            .await
1732            .project_connection_ids(project_id, session.connection_id)
1733            .await?;
1734
1735        if !project_connection_ids.contains(&leader_id) {
1736            Err(anyhow!("no such peer"))?;
1737        }
1738    }
1739
1740    let mut response_payload = session
1741        .peer
1742        .forward_request(session.connection_id, leader_id, request)
1743        .await?;
1744    response_payload
1745        .views
1746        .retain(|view| view.leader_id != Some(follower_id.into()));
1747    response.send(response_payload)?;
1748
1749    let room = session
1750        .db()
1751        .await
1752        .follow(project_id, leader_id, follower_id)
1753        .await?;
1754    room_updated(&room, &session.peer);
1755
1756    Ok(())
1757}
1758
1759async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1760    let project_id = ProjectId::from_proto(request.project_id);
1761    let leader_id = request
1762        .leader_id
1763        .ok_or_else(|| anyhow!("invalid leader id"))?
1764        .into();
1765    let follower_id = session.connection_id;
1766
1767    if !session
1768        .db()
1769        .await
1770        .project_connection_ids(project_id, session.connection_id)
1771        .await?
1772        .contains(&leader_id)
1773    {
1774        Err(anyhow!("no such peer"))?;
1775    }
1776
1777    session
1778        .peer
1779        .forward_send(session.connection_id, leader_id, request)?;
1780
1781    let room = session
1782        .db()
1783        .await
1784        .unfollow(project_id, leader_id, follower_id)
1785        .await?;
1786    room_updated(&room, &session.peer);
1787
1788    Ok(())
1789}
1790
1791async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1792    let project_id = ProjectId::from_proto(request.project_id);
1793    let project_connection_ids = session
1794        .db
1795        .lock()
1796        .await
1797        .project_connection_ids(project_id, session.connection_id)
1798        .await?;
1799
1800    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1801        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1802        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1803        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1804    });
1805    for follower_peer_id in request.follower_ids.iter().copied() {
1806        let follower_connection_id = follower_peer_id.into();
1807        if project_connection_ids.contains(&follower_connection_id)
1808            && Some(follower_peer_id) != leader_id
1809        {
1810            session.peer.forward_send(
1811                session.connection_id,
1812                follower_connection_id,
1813                request.clone(),
1814            )?;
1815        }
1816    }
1817    Ok(())
1818}
1819
1820async fn get_users(
1821    request: proto::GetUsers,
1822    response: Response<proto::GetUsers>,
1823    session: Session,
1824) -> Result<()> {
1825    let user_ids = request
1826        .user_ids
1827        .into_iter()
1828        .map(UserId::from_proto)
1829        .collect();
1830    let users = session
1831        .db()
1832        .await
1833        .get_users_by_ids(user_ids)
1834        .await?
1835        .into_iter()
1836        .map(|user| proto::User {
1837            id: user.id.to_proto(),
1838            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1839            github_login: user.github_login,
1840        })
1841        .collect();
1842    response.send(proto::UsersResponse { users })?;
1843    Ok(())
1844}
1845
1846async fn fuzzy_search_users(
1847    request: proto::FuzzySearchUsers,
1848    response: Response<proto::FuzzySearchUsers>,
1849    session: Session,
1850) -> Result<()> {
1851    let query = request.query;
1852    let users = match query.len() {
1853        0 => vec![],
1854        1 | 2 => session
1855            .db()
1856            .await
1857            .get_user_by_github_login(&query)
1858            .await?
1859            .into_iter()
1860            .collect(),
1861        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1862    };
1863    let users = users
1864        .into_iter()
1865        .filter(|user| user.id != session.user_id)
1866        .map(|user| proto::User {
1867            id: user.id.to_proto(),
1868            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1869            github_login: user.github_login,
1870        })
1871        .collect();
1872    response.send(proto::UsersResponse { users })?;
1873    Ok(())
1874}
1875
1876async fn request_contact(
1877    request: proto::RequestContact,
1878    response: Response<proto::RequestContact>,
1879    session: Session,
1880) -> Result<()> {
1881    let requester_id = session.user_id;
1882    let responder_id = UserId::from_proto(request.responder_id);
1883    if requester_id == responder_id {
1884        return Err(anyhow!("cannot add yourself as a contact"))?;
1885    }
1886
1887    session
1888        .db()
1889        .await
1890        .send_contact_request(requester_id, responder_id)
1891        .await?;
1892
1893    // Update outgoing contact requests of requester
1894    let mut update = proto::UpdateContacts::default();
1895    update.outgoing_requests.push(responder_id.to_proto());
1896    for connection_id in session
1897        .connection_pool()
1898        .await
1899        .user_connection_ids(requester_id)
1900    {
1901        session.peer.send(connection_id, update.clone())?;
1902    }
1903
1904    // Update incoming contact requests of responder
1905    let mut update = proto::UpdateContacts::default();
1906    update
1907        .incoming_requests
1908        .push(proto::IncomingContactRequest {
1909            requester_id: requester_id.to_proto(),
1910            should_notify: true,
1911        });
1912    for connection_id in session
1913        .connection_pool()
1914        .await
1915        .user_connection_ids(responder_id)
1916    {
1917        session.peer.send(connection_id, update.clone())?;
1918    }
1919
1920    response.send(proto::Ack {})?;
1921    Ok(())
1922}
1923
1924async fn respond_to_contact_request(
1925    request: proto::RespondToContactRequest,
1926    response: Response<proto::RespondToContactRequest>,
1927    session: Session,
1928) -> Result<()> {
1929    let responder_id = session.user_id;
1930    let requester_id = UserId::from_proto(request.requester_id);
1931    let db = session.db().await;
1932    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1933        db.dismiss_contact_notification(responder_id, requester_id)
1934            .await?;
1935    } else {
1936        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1937
1938        db.respond_to_contact_request(responder_id, requester_id, accept)
1939            .await?;
1940        let requester_busy = db.is_user_busy(requester_id).await?;
1941        let responder_busy = db.is_user_busy(responder_id).await?;
1942
1943        let pool = session.connection_pool().await;
1944        // Update responder with new contact
1945        let mut update = proto::UpdateContacts::default();
1946        if accept {
1947            update
1948                .contacts
1949                .push(contact_for_user(requester_id, false, requester_busy, &pool));
1950        }
1951        update
1952            .remove_incoming_requests
1953            .push(requester_id.to_proto());
1954        for connection_id in pool.user_connection_ids(responder_id) {
1955            session.peer.send(connection_id, update.clone())?;
1956        }
1957
1958        // Update requester with new contact
1959        let mut update = proto::UpdateContacts::default();
1960        if accept {
1961            update
1962                .contacts
1963                .push(contact_for_user(responder_id, true, responder_busy, &pool));
1964        }
1965        update
1966            .remove_outgoing_requests
1967            .push(responder_id.to_proto());
1968        for connection_id in pool.user_connection_ids(requester_id) {
1969            session.peer.send(connection_id, update.clone())?;
1970        }
1971    }
1972
1973    response.send(proto::Ack {})?;
1974    Ok(())
1975}
1976
1977async fn remove_contact(
1978    request: proto::RemoveContact,
1979    response: Response<proto::RemoveContact>,
1980    session: Session,
1981) -> Result<()> {
1982    let requester_id = session.user_id;
1983    let responder_id = UserId::from_proto(request.user_id);
1984    let db = session.db().await;
1985    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
1986
1987    let pool = session.connection_pool().await;
1988    // Update outgoing contact requests of requester
1989    let mut update = proto::UpdateContacts::default();
1990    if contact_accepted {
1991        update.remove_contacts.push(responder_id.to_proto());
1992    } else {
1993        update
1994            .remove_outgoing_requests
1995            .push(responder_id.to_proto());
1996    }
1997    for connection_id in pool.user_connection_ids(requester_id) {
1998        session.peer.send(connection_id, update.clone())?;
1999    }
2000
2001    // Update incoming contact requests of responder
2002    let mut update = proto::UpdateContacts::default();
2003    if contact_accepted {
2004        update.remove_contacts.push(requester_id.to_proto());
2005    } else {
2006        update
2007            .remove_incoming_requests
2008            .push(requester_id.to_proto());
2009    }
2010    for connection_id in pool.user_connection_ids(responder_id) {
2011        session.peer.send(connection_id, update.clone())?;
2012    }
2013
2014    response.send(proto::Ack {})?;
2015    Ok(())
2016}
2017
2018async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2019    let project_id = ProjectId::from_proto(request.project_id);
2020    let project_connection_ids = session
2021        .db()
2022        .await
2023        .project_connection_ids(project_id, session.connection_id)
2024        .await?;
2025    broadcast(
2026        Some(session.connection_id),
2027        project_connection_ids.iter().copied(),
2028        |connection_id| {
2029            session
2030                .peer
2031                .forward_send(session.connection_id, connection_id, request.clone())
2032        },
2033    );
2034    Ok(())
2035}
2036
2037async fn get_private_user_info(
2038    _request: proto::GetPrivateUserInfo,
2039    response: Response<proto::GetPrivateUserInfo>,
2040    session: Session,
2041) -> Result<()> {
2042    let metrics_id = session
2043        .db()
2044        .await
2045        .get_user_metrics_id(session.user_id)
2046        .await?;
2047    let user = session
2048        .db()
2049        .await
2050        .get_user_by_id(session.user_id)
2051        .await?
2052        .ok_or_else(|| anyhow!("user not found"))?;
2053    response.send(proto::GetPrivateUserInfoResponse {
2054        metrics_id,
2055        staff: user.admin,
2056    })?;
2057    Ok(())
2058}
2059
2060fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2061    match message {
2062        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2063        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2064        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2065        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2066        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2067            code: frame.code.into(),
2068            reason: frame.reason,
2069        })),
2070    }
2071}
2072
2073fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2074    match message {
2075        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2076        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2077        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2078        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2079        AxumMessage::Close(frame) => {
2080            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2081                code: frame.code.into(),
2082                reason: frame.reason,
2083            }))
2084        }
2085    }
2086}
2087
2088fn build_initial_contacts_update(
2089    contacts: Vec<db::Contact>,
2090    pool: &ConnectionPool,
2091) -> proto::UpdateContacts {
2092    let mut update = proto::UpdateContacts::default();
2093
2094    for contact in contacts {
2095        match contact {
2096            db::Contact::Accepted {
2097                user_id,
2098                should_notify,
2099                busy,
2100            } => {
2101                update
2102                    .contacts
2103                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2104            }
2105            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2106            db::Contact::Incoming {
2107                user_id,
2108                should_notify,
2109            } => update
2110                .incoming_requests
2111                .push(proto::IncomingContactRequest {
2112                    requester_id: user_id.to_proto(),
2113                    should_notify,
2114                }),
2115        }
2116    }
2117
2118    update
2119}
2120
2121fn contact_for_user(
2122    user_id: UserId,
2123    should_notify: bool,
2124    busy: bool,
2125    pool: &ConnectionPool,
2126) -> proto::Contact {
2127    proto::Contact {
2128        user_id: user_id.to_proto(),
2129        online: pool.is_user_online(user_id),
2130        busy,
2131        should_notify,
2132    }
2133}
2134
2135fn room_updated(room: &proto::Room, peer: &Peer) {
2136    broadcast(
2137        None,
2138        room.participants
2139            .iter()
2140            .filter_map(|participant| Some(participant.peer_id?.into())),
2141        |peer_id| {
2142            peer.send(
2143                peer_id.into(),
2144                proto::RoomUpdated {
2145                    room: Some(room.clone()),
2146                },
2147            )
2148        },
2149    );
2150}
2151
2152async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2153    let db = session.db().await;
2154    let contacts = db.get_contacts(user_id).await?;
2155    let busy = db.is_user_busy(user_id).await?;
2156
2157    let pool = session.connection_pool().await;
2158    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2159    for contact in contacts {
2160        if let db::Contact::Accepted {
2161            user_id: contact_user_id,
2162            ..
2163        } = contact
2164        {
2165            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2166                session
2167                    .peer
2168                    .send(
2169                        contact_conn_id,
2170                        proto::UpdateContacts {
2171                            contacts: vec![updated_contact.clone()],
2172                            remove_contacts: Default::default(),
2173                            incoming_requests: Default::default(),
2174                            remove_incoming_requests: Default::default(),
2175                            outgoing_requests: Default::default(),
2176                            remove_outgoing_requests: Default::default(),
2177                        },
2178                    )
2179                    .trace_err();
2180            }
2181        }
2182    }
2183    Ok(())
2184}
2185
2186async fn leave_room_for_session(session: &Session) -> Result<()> {
2187    let mut contacts_to_update = HashSet::default();
2188
2189    let room_id;
2190    let canceled_calls_to_user_ids;
2191    let live_kit_room;
2192    let delete_live_kit_room;
2193    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2194        contacts_to_update.insert(session.user_id);
2195
2196        for project in left_room.left_projects.values() {
2197            project_left(project, session);
2198        }
2199
2200        room_updated(&left_room.room, &session.peer);
2201        room_id = RoomId::from_proto(left_room.room.id);
2202        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2203        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2204        delete_live_kit_room = left_room.room.participants.is_empty();
2205    } else {
2206        return Ok(());
2207    }
2208
2209    {
2210        let pool = session.connection_pool().await;
2211        for canceled_user_id in canceled_calls_to_user_ids {
2212            for connection_id in pool.user_connection_ids(canceled_user_id) {
2213                session
2214                    .peer
2215                    .send(
2216                        connection_id,
2217                        proto::CallCanceled {
2218                            room_id: room_id.to_proto(),
2219                        },
2220                    )
2221                    .trace_err();
2222            }
2223            contacts_to_update.insert(canceled_user_id);
2224        }
2225    }
2226
2227    for contact_user_id in contacts_to_update {
2228        update_user_contacts(contact_user_id, &session).await?;
2229    }
2230
2231    if let Some(live_kit) = session.live_kit_client.as_ref() {
2232        live_kit
2233            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2234            .await
2235            .trace_err();
2236
2237        if delete_live_kit_room {
2238            live_kit.delete_room(live_kit_room).await.trace_err();
2239        }
2240    }
2241
2242    Ok(())
2243}
2244
2245fn project_left(project: &db::LeftProject, session: &Session) {
2246    for connection_id in &project.connection_ids {
2247        if project.host_user_id == session.user_id {
2248            session
2249                .peer
2250                .send(
2251                    *connection_id,
2252                    proto::UnshareProject {
2253                        project_id: project.id.to_proto(),
2254                    },
2255                )
2256                .trace_err();
2257        } else {
2258            session
2259                .peer
2260                .send(
2261                    *connection_id,
2262                    proto::RemoveProjectCollaborator {
2263                        project_id: project.id.to_proto(),
2264                        peer_id: Some(session.connection_id.into()),
2265                    },
2266                )
2267                .trace_err();
2268        }
2269    }
2270}
2271
2272pub trait ResultExt {
2273    type Ok;
2274
2275    fn trace_err(self) -> Option<Self::Ok>;
2276}
2277
2278impl<T, E> ResultExt for Result<T, E>
2279where
2280    E: std::fmt::Debug,
2281{
2282    type Ok = T;
2283
2284    fn trace_err(self) -> Option<T> {
2285        match self {
2286            Ok(value) => Some(value),
2287            Err(error) => {
2288                tracing::error!("{:?}", error);
2289                None
2290            }
2291        }
2292    }
2293}