rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
   6    executor::Executor,
   7    AppState, Result,
   8};
   9use anyhow::anyhow;
  10use async_tungstenite::tungstenite::{
  11    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  12};
  13use axum::{
  14    body::Body,
  15    extract::{
  16        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  17        ConnectInfo, WebSocketUpgrade,
  18    },
  19    headers::{Header, HeaderName},
  20    http::StatusCode,
  21    middleware,
  22    response::IntoResponse,
  23    routing::get,
  24    Extension, Router, TypedHeader,
  25};
  26use collections::{HashMap, HashSet};
  27pub use connection_pool::ConnectionPool;
  28use futures::{
  29    channel::oneshot,
  30    future::{self, BoxFuture},
  31    stream::FuturesUnordered,
  32    FutureExt, SinkExt, StreamExt, TryStreamExt,
  33};
  34use lazy_static::lazy_static;
  35use prometheus::{register_int_gauge, IntGauge};
  36use rpc::{
  37    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  38    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  39};
  40use serde::{Serialize, Serializer};
  41use std::{
  42    any::TypeId,
  43    fmt,
  44    future::Future,
  45    marker::PhantomData,
  46    mem,
  47    net::SocketAddr,
  48    ops::{Deref, DerefMut},
  49    rc::Rc,
  50    sync::{
  51        atomic::{AtomicBool, Ordering::SeqCst},
  52        Arc,
  53    },
  54    time::Duration,
  55};
  56use tokio::sync::{watch, Semaphore};
  57use tower::ServiceBuilder;
  58use tracing::{info_span, instrument, Instrument};
  59
  60pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  61pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  62
  63lazy_static! {
  64    static ref METRIC_CONNECTIONS: IntGauge =
  65        register_int_gauge!("connections", "number of connections").unwrap();
  66    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  67        "shared_projects",
  68        "number of open projects with one or more guests"
  69    )
  70    .unwrap();
  71}
  72
  73type MessageHandler =
  74    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  75
  76struct Response<R> {
  77    peer: Arc<Peer>,
  78    receipt: Receipt<R>,
  79    responded: Arc<AtomicBool>,
  80}
  81
  82impl<R: RequestMessage> Response<R> {
  83    fn send(self, payload: R::Response) -> Result<()> {
  84        self.responded.store(true, SeqCst);
  85        self.peer.respond(self.receipt, payload)?;
  86        Ok(())
  87    }
  88}
  89
  90#[derive(Clone)]
  91struct Session {
  92    user_id: UserId,
  93    connection_id: ConnectionId,
  94    db: Arc<tokio::sync::Mutex<DbHandle>>,
  95    peer: Arc<Peer>,
  96    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
  97    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
  98    executor: Executor,
  99}
 100
 101impl Session {
 102    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 103        #[cfg(test)]
 104        tokio::task::yield_now().await;
 105        let guard = self.db.lock().await;
 106        #[cfg(test)]
 107        tokio::task::yield_now().await;
 108        guard
 109    }
 110
 111    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 112        #[cfg(test)]
 113        tokio::task::yield_now().await;
 114        let guard = self.connection_pool.lock();
 115        ConnectionPoolGuard {
 116            guard,
 117            _not_send: PhantomData,
 118        }
 119    }
 120}
 121
 122impl fmt::Debug for Session {
 123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 124        f.debug_struct("Session")
 125            .field("user_id", &self.user_id)
 126            .field("connection_id", &self.connection_id)
 127            .finish()
 128    }
 129}
 130
 131struct DbHandle(Arc<Database>);
 132
 133impl Deref for DbHandle {
 134    type Target = Database;
 135
 136    fn deref(&self) -> &Self::Target {
 137        self.0.as_ref()
 138    }
 139}
 140
 141pub struct Server {
 142    id: parking_lot::Mutex<ServerId>,
 143    peer: Arc<Peer>,
 144    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 145    app_state: Arc<AppState>,
 146    executor: Executor,
 147    handlers: HashMap<TypeId, MessageHandler>,
 148    teardown: watch::Sender<()>,
 149}
 150
 151pub(crate) struct ConnectionPoolGuard<'a> {
 152    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 153    _not_send: PhantomData<Rc<()>>,
 154}
 155
 156#[derive(Serialize)]
 157pub struct ServerSnapshot<'a> {
 158    peer: &'a Peer,
 159    #[serde(serialize_with = "serialize_deref")]
 160    connection_pool: ConnectionPoolGuard<'a>,
 161}
 162
 163pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 164where
 165    S: Serializer,
 166    T: Deref<Target = U>,
 167    U: Serialize,
 168{
 169    Serialize::serialize(value.deref(), serializer)
 170}
 171
 172impl Server {
 173    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 174        let mut server = Self {
 175            id: parking_lot::Mutex::new(id),
 176            peer: Peer::new(id.0 as u32),
 177            app_state,
 178            executor,
 179            connection_pool: Default::default(),
 180            handlers: Default::default(),
 181            teardown: watch::channel(()).0,
 182        };
 183
 184        server
 185            .add_request_handler(ping)
 186            .add_request_handler(create_room)
 187            .add_request_handler(join_room)
 188            .add_request_handler(rejoin_room)
 189            .add_request_handler(leave_room)
 190            .add_request_handler(call)
 191            .add_request_handler(cancel_call)
 192            .add_message_handler(decline_call)
 193            .add_request_handler(update_participant_location)
 194            .add_request_handler(share_project)
 195            .add_message_handler(unshare_project)
 196            .add_request_handler(join_project)
 197            .add_message_handler(leave_project)
 198            .add_request_handler(update_project)
 199            .add_request_handler(update_worktree)
 200            .add_message_handler(start_language_server)
 201            .add_message_handler(update_language_server)
 202            .add_message_handler(update_diagnostic_summary)
 203            .add_request_handler(forward_project_request::<proto::GetHover>)
 204            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 205            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 206            .add_request_handler(forward_project_request::<proto::GetReferences>)
 207            .add_request_handler(forward_project_request::<proto::SearchProject>)
 208            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 209            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 210            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 211            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 212            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 213            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 214            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 215            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 216            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 217            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 218            .add_request_handler(forward_project_request::<proto::PerformRename>)
 219            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 220            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 221            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 222            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 223            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 224            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 225            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 226            .add_message_handler(create_buffer_for_peer)
 227            .add_request_handler(update_buffer)
 228            .add_message_handler(update_buffer_file)
 229            .add_message_handler(buffer_reloaded)
 230            .add_message_handler(buffer_saved)
 231            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 232            .add_request_handler(get_users)
 233            .add_request_handler(fuzzy_search_users)
 234            .add_request_handler(request_contact)
 235            .add_request_handler(remove_contact)
 236            .add_request_handler(respond_to_contact_request)
 237            .add_request_handler(follow)
 238            .add_message_handler(unfollow)
 239            .add_message_handler(update_followers)
 240            .add_message_handler(update_diff_base)
 241            .add_request_handler(get_private_user_info);
 242
 243        Arc::new(server)
 244    }
 245
 246    pub async fn start(&self) -> Result<()> {
 247        let server_id = *self.id.lock();
 248        let app_state = self.app_state.clone();
 249        let peer = self.peer.clone();
 250        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 251        let pool = self.connection_pool.clone();
 252        let live_kit_client = self.app_state.live_kit_client.clone();
 253
 254        let span = info_span!("start server");
 255        self.executor.spawn_detached(
 256            async move {
 257                tracing::info!("waiting for cleanup timeout");
 258                timeout.await;
 259                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 260                if let Some(room_ids) = app_state
 261                    .db
 262                    .stale_room_ids(&app_state.config.zed_environment, server_id)
 263                    .await
 264                    .trace_err()
 265                {
 266                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 267                    for room_id in room_ids {
 268                        let mut contacts_to_update = HashSet::default();
 269                        let mut canceled_calls_to_user_ids = Vec::new();
 270                        let mut live_kit_room = String::new();
 271                        let mut delete_live_kit_room = false;
 272
 273                        if let Some(mut refreshed_room) = app_state
 274                            .db
 275                            .refresh_room(room_id, server_id)
 276                            .await
 277                            .trace_err()
 278                        {
 279                            tracing::info!(
 280                                room_id = room_id.0,
 281                                new_participant_count = refreshed_room.room.participants.len(),
 282                                "refreshed room"
 283                            );
 284                            room_updated(&refreshed_room.room, &peer);
 285                            contacts_to_update
 286                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 287                            contacts_to_update
 288                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 289                            canceled_calls_to_user_ids =
 290                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 291                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 292                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 293                        }
 294
 295                        {
 296                            let pool = pool.lock();
 297                            for canceled_user_id in canceled_calls_to_user_ids {
 298                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 299                                    peer.send(
 300                                        connection_id,
 301                                        proto::CallCanceled {
 302                                            room_id: room_id.to_proto(),
 303                                        },
 304                                    )
 305                                    .trace_err();
 306                                }
 307                            }
 308                        }
 309
 310                        for user_id in contacts_to_update {
 311                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 312                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 313                            if let Some((busy, contacts)) = busy.zip(contacts) {
 314                                let pool = pool.lock();
 315                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 316                                for contact in contacts {
 317                                    if let db::Contact::Accepted {
 318                                        user_id: contact_user_id,
 319                                        ..
 320                                    } = contact
 321                                    {
 322                                        for contact_conn_id in
 323                                            pool.user_connection_ids(contact_user_id)
 324                                        {
 325                                            peer.send(
 326                                                contact_conn_id,
 327                                                proto::UpdateContacts {
 328                                                    contacts: vec![updated_contact.clone()],
 329                                                    remove_contacts: Default::default(),
 330                                                    incoming_requests: Default::default(),
 331                                                    remove_incoming_requests: Default::default(),
 332                                                    outgoing_requests: Default::default(),
 333                                                    remove_outgoing_requests: Default::default(),
 334                                                },
 335                                            )
 336                                            .trace_err();
 337                                        }
 338                                    }
 339                                }
 340                            }
 341                        }
 342
 343                        if let Some(live_kit) = live_kit_client.as_ref() {
 344                            if delete_live_kit_room {
 345                                live_kit.delete_room(live_kit_room).await.trace_err();
 346                            }
 347                        }
 348                    }
 349                }
 350
 351                app_state
 352                    .db
 353                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 354                    .await
 355                    .trace_err();
 356            }
 357            .instrument(span),
 358        );
 359        Ok(())
 360    }
 361
 362    pub fn teardown(&self) {
 363        self.peer.teardown();
 364        self.connection_pool.lock().reset();
 365        let _ = self.teardown.send(());
 366    }
 367
 368    #[cfg(test)]
 369    pub fn reset(&self, id: ServerId) {
 370        self.teardown();
 371        *self.id.lock() = id;
 372        self.peer.reset(id.0 as u32);
 373    }
 374
 375    #[cfg(test)]
 376    pub fn id(&self) -> ServerId {
 377        *self.id.lock()
 378    }
 379
 380    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 381    where
 382        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 383        Fut: 'static + Send + Future<Output = Result<()>>,
 384        M: EnvelopedMessage,
 385    {
 386        let prev_handler = self.handlers.insert(
 387            TypeId::of::<M>(),
 388            Box::new(move |envelope, session| {
 389                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 390                let span = info_span!(
 391                    "handle message",
 392                    payload_type = envelope.payload_type_name()
 393                );
 394                span.in_scope(|| {
 395                    tracing::info!(
 396                        payload_type = envelope.payload_type_name(),
 397                        "message received"
 398                    );
 399                });
 400                let future = (handler)(*envelope, session);
 401                async move {
 402                    if let Err(error) = future.await {
 403                        tracing::error!(%error, "error handling message");
 404                    }
 405                }
 406                .instrument(span)
 407                .boxed()
 408            }),
 409        );
 410        if prev_handler.is_some() {
 411            panic!("registered a handler for the same message twice");
 412        }
 413        self
 414    }
 415
 416    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 417    where
 418        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 419        Fut: 'static + Send + Future<Output = Result<()>>,
 420        M: EnvelopedMessage,
 421    {
 422        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 423        self
 424    }
 425
 426    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 427    where
 428        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 429        Fut: Send + Future<Output = Result<()>>,
 430        M: RequestMessage,
 431    {
 432        let handler = Arc::new(handler);
 433        self.add_handler(move |envelope, session| {
 434            let receipt = envelope.receipt();
 435            let handler = handler.clone();
 436            async move {
 437                let peer = session.peer.clone();
 438                let responded = Arc::new(AtomicBool::default());
 439                let response = Response {
 440                    peer: peer.clone(),
 441                    responded: responded.clone(),
 442                    receipt,
 443                };
 444                match (handler)(envelope.payload, response, session).await {
 445                    Ok(()) => {
 446                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 447                            Ok(())
 448                        } else {
 449                            Err(anyhow!("handler did not send a response"))?
 450                        }
 451                    }
 452                    Err(error) => {
 453                        peer.respond_with_error(
 454                            receipt,
 455                            proto::Error {
 456                                message: error.to_string(),
 457                            },
 458                        )?;
 459                        Err(error)
 460                    }
 461                }
 462            }
 463        })
 464    }
 465
 466    pub fn handle_connection(
 467        self: &Arc<Self>,
 468        connection: Connection,
 469        address: String,
 470        user: User,
 471        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 472        executor: Executor,
 473    ) -> impl Future<Output = Result<()>> {
 474        let this = self.clone();
 475        let user_id = user.id;
 476        let login = user.github_login;
 477        let span = info_span!("handle connection", %user_id, %login, %address);
 478        let mut teardown = self.teardown.subscribe();
 479        async move {
 480            let (connection_id, handle_io, mut incoming_rx) = this
 481                .peer
 482                .add_connection(connection, {
 483                    let executor = executor.clone();
 484                    move |duration| executor.sleep(duration)
 485                });
 486
 487            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 488            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 489            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 490
 491            if let Some(send_connection_id) = send_connection_id.take() {
 492                let _ = send_connection_id.send(connection_id);
 493            }
 494
 495            if !user.connected_once {
 496                this.peer.send(connection_id, proto::ShowContacts {})?;
 497                this.app_state.db.set_user_connected_once(user_id, true).await?;
 498            }
 499
 500            let (contacts, invite_code) = future::try_join(
 501                this.app_state.db.get_contacts(user_id),
 502                this.app_state.db.get_invite_code_for_user(user_id)
 503            ).await?;
 504
 505            {
 506                let mut pool = this.connection_pool.lock();
 507                pool.add_connection(connection_id, user_id, user.admin);
 508                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 509
 510                if let Some((code, count)) = invite_code {
 511                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 512                        url: format!("{}{}", this.app_state.config.invite_link_prefix, code),
 513                        count: count as u32,
 514                    })?;
 515                }
 516            }
 517
 518            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 519                this.peer.send(connection_id, incoming_call)?;
 520            }
 521
 522            let session = Session {
 523                user_id,
 524                connection_id,
 525                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 526                peer: this.peer.clone(),
 527                connection_pool: this.connection_pool.clone(),
 528                live_kit_client: this.app_state.live_kit_client.clone(),
 529                executor: executor.clone(),
 530            };
 531            update_user_contacts(user_id, &session).await?;
 532
 533            let handle_io = handle_io.fuse();
 534            futures::pin_mut!(handle_io);
 535
 536            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 537            // This prevents deadlocks when e.g., client A performs a request to client B and
 538            // client B performs a request to client A. If both clients stop processing further
 539            // messages until their respective request completes, they won't have a chance to
 540            // respond to the other client's request and cause a deadlock.
 541            //
 542            // This arrangement ensures we will attempt to process earlier messages first, but fall
 543            // back to processing messages arrived later in the spirit of making progress.
 544            let mut foreground_message_handlers = FuturesUnordered::new();
 545            let concurrent_handlers = Arc::new(Semaphore::new(256));
 546            loop {
 547                let next_message = async {
 548                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 549                    let message = incoming_rx.next().await;
 550                    (permit, message)
 551                }.fuse();
 552                futures::pin_mut!(next_message);
 553                futures::select_biased! {
 554                    _ = teardown.changed().fuse() => return Ok(()),
 555                    result = handle_io => {
 556                        if let Err(error) = result {
 557                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 558                        }
 559                        break;
 560                    }
 561                    _ = foreground_message_handlers.next() => {}
 562                    next_message = next_message => {
 563                        let (permit, message) = next_message;
 564                        if let Some(message) = message {
 565                            let type_name = message.payload_type_name();
 566                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 567                            let span_enter = span.enter();
 568                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 569                                let is_background = message.is_background();
 570                                let handle_message = (handler)(message, session.clone());
 571                                drop(span_enter);
 572
 573                                let handle_message = async move {
 574                                    handle_message.await;
 575                                    drop(permit);
 576                                }.instrument(span);
 577                                if is_background {
 578                                    executor.spawn_detached(handle_message);
 579                                } else {
 580                                    foreground_message_handlers.push(handle_message);
 581                                }
 582                            } else {
 583                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 584                            }
 585                        } else {
 586                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 587                            break;
 588                        }
 589                    }
 590                }
 591            }
 592
 593            drop(foreground_message_handlers);
 594            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 595            if let Err(error) = connection_lost(session, teardown, executor).await {
 596                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 597            }
 598
 599            Ok(())
 600        }.instrument(span)
 601    }
 602
 603    pub async fn invite_code_redeemed(
 604        self: &Arc<Self>,
 605        inviter_id: UserId,
 606        invitee_id: UserId,
 607    ) -> Result<()> {
 608        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 609            if let Some(code) = &user.invite_code {
 610                let pool = self.connection_pool.lock();
 611                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 612                for connection_id in pool.user_connection_ids(inviter_id) {
 613                    self.peer.send(
 614                        connection_id,
 615                        proto::UpdateContacts {
 616                            contacts: vec![invitee_contact.clone()],
 617                            ..Default::default()
 618                        },
 619                    )?;
 620                    self.peer.send(
 621                        connection_id,
 622                        proto::UpdateInviteInfo {
 623                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 624                            count: user.invite_count as u32,
 625                        },
 626                    )?;
 627                }
 628            }
 629        }
 630        Ok(())
 631    }
 632
 633    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 634        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 635            if let Some(invite_code) = &user.invite_code {
 636                let pool = self.connection_pool.lock();
 637                for connection_id in pool.user_connection_ids(user_id) {
 638                    self.peer.send(
 639                        connection_id,
 640                        proto::UpdateInviteInfo {
 641                            url: format!(
 642                                "{}{}",
 643                                self.app_state.config.invite_link_prefix, invite_code
 644                            ),
 645                            count: user.invite_count as u32,
 646                        },
 647                    )?;
 648                }
 649            }
 650        }
 651        Ok(())
 652    }
 653
 654    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 655        ServerSnapshot {
 656            connection_pool: ConnectionPoolGuard {
 657                guard: self.connection_pool.lock(),
 658                _not_send: PhantomData,
 659            },
 660            peer: &self.peer,
 661        }
 662    }
 663}
 664
 665impl<'a> Deref for ConnectionPoolGuard<'a> {
 666    type Target = ConnectionPool;
 667
 668    fn deref(&self) -> &Self::Target {
 669        &*self.guard
 670    }
 671}
 672
 673impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 674    fn deref_mut(&mut self) -> &mut Self::Target {
 675        &mut *self.guard
 676    }
 677}
 678
 679impl<'a> Drop for ConnectionPoolGuard<'a> {
 680    fn drop(&mut self) {
 681        #[cfg(test)]
 682        self.check_invariants();
 683    }
 684}
 685
 686fn broadcast<F>(
 687    sender_id: Option<ConnectionId>,
 688    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 689    mut f: F,
 690) where
 691    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 692{
 693    for receiver_id in receiver_ids {
 694        if Some(receiver_id) != sender_id {
 695            if let Err(error) = f(receiver_id) {
 696                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 697            }
 698        }
 699    }
 700}
 701
 702lazy_static! {
 703    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 704}
 705
 706pub struct ProtocolVersion(u32);
 707
 708impl Header for ProtocolVersion {
 709    fn name() -> &'static HeaderName {
 710        &ZED_PROTOCOL_VERSION
 711    }
 712
 713    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 714    where
 715        Self: Sized,
 716        I: Iterator<Item = &'i axum::http::HeaderValue>,
 717    {
 718        let version = values
 719            .next()
 720            .ok_or_else(axum::headers::Error::invalid)?
 721            .to_str()
 722            .map_err(|_| axum::headers::Error::invalid())?
 723            .parse()
 724            .map_err(|_| axum::headers::Error::invalid())?;
 725        Ok(Self(version))
 726    }
 727
 728    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 729        values.extend([self.0.to_string().parse().unwrap()]);
 730    }
 731}
 732
 733pub fn routes(server: Arc<Server>) -> Router<Body> {
 734    Router::new()
 735        .route("/rpc", get(handle_websocket_request))
 736        .layer(
 737            ServiceBuilder::new()
 738                .layer(Extension(server.app_state.clone()))
 739                .layer(middleware::from_fn(auth::validate_header)),
 740        )
 741        .route("/metrics", get(handle_metrics))
 742        .layer(Extension(server))
 743}
 744
 745pub async fn handle_websocket_request(
 746    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 747    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 748    Extension(server): Extension<Arc<Server>>,
 749    Extension(user): Extension<User>,
 750    ws: WebSocketUpgrade,
 751) -> axum::response::Response {
 752    if protocol_version != rpc::PROTOCOL_VERSION {
 753        return (
 754            StatusCode::UPGRADE_REQUIRED,
 755            "client must be upgraded".to_string(),
 756        )
 757            .into_response();
 758    }
 759    let socket_address = socket_address.to_string();
 760    ws.on_upgrade(move |socket| {
 761        use util::ResultExt;
 762        let socket = socket
 763            .map_ok(to_tungstenite_message)
 764            .err_into()
 765            .with(|message| async move { Ok(to_axum_message(message)) });
 766        let connection = Connection::new(Box::pin(socket));
 767        async move {
 768            server
 769                .handle_connection(connection, socket_address, user, None, Executor::Production)
 770                .await
 771                .log_err();
 772        }
 773    })
 774}
 775
 776pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 777    let connections = server
 778        .connection_pool
 779        .lock()
 780        .connections()
 781        .filter(|connection| !connection.admin)
 782        .count();
 783
 784    METRIC_CONNECTIONS.set(connections as _);
 785
 786    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 787    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 788
 789    let encoder = prometheus::TextEncoder::new();
 790    let metric_families = prometheus::gather();
 791    let encoded_metrics = encoder
 792        .encode_to_string(&metric_families)
 793        .map_err(|err| anyhow!("{}", err))?;
 794    Ok(encoded_metrics)
 795}
 796
 797#[instrument(err, skip(executor))]
 798async fn connection_lost(
 799    session: Session,
 800    mut teardown: watch::Receiver<()>,
 801    executor: Executor,
 802) -> Result<()> {
 803    session.peer.disconnect(session.connection_id);
 804    session
 805        .connection_pool()
 806        .await
 807        .remove_connection(session.connection_id)?;
 808
 809    session
 810        .db()
 811        .await
 812        .connection_lost(session.connection_id)
 813        .await
 814        .trace_err();
 815
 816    futures::select_biased! {
 817        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 818            leave_room_for_session(&session).await.trace_err();
 819
 820            if !session
 821                .connection_pool()
 822                .await
 823                .is_user_online(session.user_id)
 824            {
 825                let db = session.db().await;
 826                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 827                    room_updated(&room, &session.peer);
 828                }
 829            }
 830            update_user_contacts(session.user_id, &session).await?;
 831        }
 832        _ = teardown.changed().fuse() => {}
 833    }
 834
 835    Ok(())
 836}
 837
 838async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 839    response.send(proto::Ack {})?;
 840    Ok(())
 841}
 842
 843async fn create_room(
 844    _request: proto::CreateRoom,
 845    response: Response<proto::CreateRoom>,
 846    session: Session,
 847) -> Result<()> {
 848    let live_kit_room = nanoid::nanoid!(30);
 849    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 850        if let Some(_) = live_kit
 851            .create_room(live_kit_room.clone())
 852            .await
 853            .trace_err()
 854        {
 855            if let Some(token) = live_kit
 856                .room_token(&live_kit_room, &session.user_id.to_string())
 857                .trace_err()
 858            {
 859                Some(proto::LiveKitConnectionInfo {
 860                    server_url: live_kit.url().into(),
 861                    token,
 862                })
 863            } else {
 864                None
 865            }
 866        } else {
 867            None
 868        }
 869    } else {
 870        None
 871    };
 872
 873    {
 874        let room = session
 875            .db()
 876            .await
 877            .create_room(session.user_id, session.connection_id, &live_kit_room)
 878            .await?;
 879
 880        response.send(proto::CreateRoomResponse {
 881            room: Some(room.clone()),
 882            live_kit_connection_info,
 883        })?;
 884    }
 885
 886    update_user_contacts(session.user_id, &session).await?;
 887    Ok(())
 888}
 889
 890async fn join_room(
 891    request: proto::JoinRoom,
 892    response: Response<proto::JoinRoom>,
 893    session: Session,
 894) -> Result<()> {
 895    let room_id = RoomId::from_proto(request.id);
 896    let room = {
 897        let room = session
 898            .db()
 899            .await
 900            .join_room(room_id, session.user_id, session.connection_id)
 901            .await?;
 902        room_updated(&room, &session.peer);
 903        room.clone()
 904    };
 905
 906    for connection_id in session
 907        .connection_pool()
 908        .await
 909        .user_connection_ids(session.user_id)
 910    {
 911        session
 912            .peer
 913            .send(
 914                connection_id,
 915                proto::CallCanceled {
 916                    room_id: room_id.to_proto(),
 917                },
 918            )
 919            .trace_err();
 920    }
 921
 922    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
 923        if let Some(token) = live_kit
 924            .room_token(&room.live_kit_room, &session.user_id.to_string())
 925            .trace_err()
 926        {
 927            Some(proto::LiveKitConnectionInfo {
 928                server_url: live_kit.url().into(),
 929                token,
 930            })
 931        } else {
 932            None
 933        }
 934    } else {
 935        None
 936    };
 937
 938    response.send(proto::JoinRoomResponse {
 939        room: Some(room),
 940        live_kit_connection_info,
 941    })?;
 942
 943    update_user_contacts(session.user_id, &session).await?;
 944    Ok(())
 945}
 946
 947async fn rejoin_room(
 948    request: proto::RejoinRoom,
 949    response: Response<proto::RejoinRoom>,
 950    session: Session,
 951) -> Result<()> {
 952    {
 953        let mut rejoined_room = session
 954            .db()
 955            .await
 956            .rejoin_room(request, session.user_id, session.connection_id)
 957            .await?;
 958
 959        response.send(proto::RejoinRoomResponse {
 960            room: Some(rejoined_room.room.clone()),
 961            reshared_projects: rejoined_room
 962                .reshared_projects
 963                .iter()
 964                .map(|project| proto::ResharedProject {
 965                    id: project.id.to_proto(),
 966                    collaborators: project
 967                        .collaborators
 968                        .iter()
 969                        .map(|collaborator| collaborator.to_proto())
 970                        .collect(),
 971                })
 972                .collect(),
 973            rejoined_projects: rejoined_room
 974                .rejoined_projects
 975                .iter()
 976                .map(|rejoined_project| proto::RejoinedProject {
 977                    id: rejoined_project.id.to_proto(),
 978                    worktrees: rejoined_project
 979                        .worktrees
 980                        .iter()
 981                        .map(|worktree| proto::WorktreeMetadata {
 982                            id: worktree.id,
 983                            root_name: worktree.root_name.clone(),
 984                            visible: worktree.visible,
 985                            abs_path: worktree.abs_path.clone(),
 986                        })
 987                        .collect(),
 988                    collaborators: rejoined_project
 989                        .collaborators
 990                        .iter()
 991                        .map(|collaborator| collaborator.to_proto())
 992                        .collect(),
 993                    language_servers: rejoined_project.language_servers.clone(),
 994                })
 995                .collect(),
 996        })?;
 997        room_updated(&rejoined_room.room, &session.peer);
 998
 999        for project in &rejoined_room.reshared_projects {
1000            for collaborator in &project.collaborators {
1001                session
1002                    .peer
1003                    .send(
1004                        collaborator.connection_id,
1005                        proto::UpdateProjectCollaborator {
1006                            project_id: project.id.to_proto(),
1007                            old_peer_id: Some(project.old_connection_id.into()),
1008                            new_peer_id: Some(session.connection_id.into()),
1009                        },
1010                    )
1011                    .trace_err();
1012            }
1013
1014            broadcast(
1015                Some(session.connection_id),
1016                project
1017                    .collaborators
1018                    .iter()
1019                    .map(|collaborator| collaborator.connection_id),
1020                |connection_id| {
1021                    session.peer.forward_send(
1022                        session.connection_id,
1023                        connection_id,
1024                        proto::UpdateProject {
1025                            project_id: project.id.to_proto(),
1026                            worktrees: project.worktrees.clone(),
1027                        },
1028                    )
1029                },
1030            );
1031        }
1032
1033        for project in &rejoined_room.rejoined_projects {
1034            for collaborator in &project.collaborators {
1035                session
1036                    .peer
1037                    .send(
1038                        collaborator.connection_id,
1039                        proto::UpdateProjectCollaborator {
1040                            project_id: project.id.to_proto(),
1041                            old_peer_id: Some(project.old_connection_id.into()),
1042                            new_peer_id: Some(session.connection_id.into()),
1043                        },
1044                    )
1045                    .trace_err();
1046            }
1047        }
1048
1049        for project in &mut rejoined_room.rejoined_projects {
1050            for worktree in mem::take(&mut project.worktrees) {
1051                #[cfg(any(test, feature = "test-support"))]
1052                const MAX_CHUNK_SIZE: usize = 2;
1053                #[cfg(not(any(test, feature = "test-support")))]
1054                const MAX_CHUNK_SIZE: usize = 256;
1055
1056                // Stream this worktree's entries.
1057                let message = proto::UpdateWorktree {
1058                    project_id: project.id.to_proto(),
1059                    worktree_id: worktree.id,
1060                    abs_path: worktree.abs_path.clone(),
1061                    root_name: worktree.root_name,
1062                    updated_entries: worktree.updated_entries,
1063                    removed_entries: worktree.removed_entries,
1064                    scan_id: worktree.scan_id,
1065                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1066                };
1067                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1068                    session.peer.send(session.connection_id, update.clone())?;
1069                }
1070
1071                // Stream this worktree's diagnostics.
1072                for summary in worktree.diagnostic_summaries {
1073                    session.peer.send(
1074                        session.connection_id,
1075                        proto::UpdateDiagnosticSummary {
1076                            project_id: project.id.to_proto(),
1077                            worktree_id: worktree.id,
1078                            summary: Some(summary),
1079                        },
1080                    )?;
1081                }
1082            }
1083
1084            for language_server in &project.language_servers {
1085                session.peer.send(
1086                    session.connection_id,
1087                    proto::UpdateLanguageServer {
1088                        project_id: project.id.to_proto(),
1089                        language_server_id: language_server.id,
1090                        variant: Some(
1091                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1092                                proto::LspDiskBasedDiagnosticsUpdated {},
1093                            ),
1094                        ),
1095                    },
1096                )?;
1097            }
1098        }
1099    }
1100
1101    update_user_contacts(session.user_id, &session).await?;
1102    Ok(())
1103}
1104
1105async fn leave_room(
1106    _: proto::LeaveRoom,
1107    response: Response<proto::LeaveRoom>,
1108    session: Session,
1109) -> Result<()> {
1110    leave_room_for_session(&session).await?;
1111    response.send(proto::Ack {})?;
1112    Ok(())
1113}
1114
1115async fn call(
1116    request: proto::Call,
1117    response: Response<proto::Call>,
1118    session: Session,
1119) -> Result<()> {
1120    let room_id = RoomId::from_proto(request.room_id);
1121    let calling_user_id = session.user_id;
1122    let calling_connection_id = session.connection_id;
1123    let called_user_id = UserId::from_proto(request.called_user_id);
1124    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1125    if !session
1126        .db()
1127        .await
1128        .has_contact(calling_user_id, called_user_id)
1129        .await?
1130    {
1131        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1132    }
1133
1134    let incoming_call = {
1135        let (room, incoming_call) = &mut *session
1136            .db()
1137            .await
1138            .call(
1139                room_id,
1140                calling_user_id,
1141                calling_connection_id,
1142                called_user_id,
1143                initial_project_id,
1144            )
1145            .await?;
1146        room_updated(&room, &session.peer);
1147        mem::take(incoming_call)
1148    };
1149    update_user_contacts(called_user_id, &session).await?;
1150
1151    let mut calls = session
1152        .connection_pool()
1153        .await
1154        .user_connection_ids(called_user_id)
1155        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1156        .collect::<FuturesUnordered<_>>();
1157
1158    while let Some(call_response) = calls.next().await {
1159        match call_response.as_ref() {
1160            Ok(_) => {
1161                response.send(proto::Ack {})?;
1162                return Ok(());
1163            }
1164            Err(_) => {
1165                call_response.trace_err();
1166            }
1167        }
1168    }
1169
1170    {
1171        let room = session
1172            .db()
1173            .await
1174            .call_failed(room_id, called_user_id)
1175            .await?;
1176        room_updated(&room, &session.peer);
1177    }
1178    update_user_contacts(called_user_id, &session).await?;
1179
1180    Err(anyhow!("failed to ring user"))?
1181}
1182
1183async fn cancel_call(
1184    request: proto::CancelCall,
1185    response: Response<proto::CancelCall>,
1186    session: Session,
1187) -> Result<()> {
1188    let called_user_id = UserId::from_proto(request.called_user_id);
1189    let room_id = RoomId::from_proto(request.room_id);
1190    {
1191        let room = session
1192            .db()
1193            .await
1194            .cancel_call(room_id, session.connection_id, called_user_id)
1195            .await?;
1196        room_updated(&room, &session.peer);
1197    }
1198
1199    for connection_id in session
1200        .connection_pool()
1201        .await
1202        .user_connection_ids(called_user_id)
1203    {
1204        session
1205            .peer
1206            .send(
1207                connection_id,
1208                proto::CallCanceled {
1209                    room_id: room_id.to_proto(),
1210                },
1211            )
1212            .trace_err();
1213    }
1214    response.send(proto::Ack {})?;
1215
1216    update_user_contacts(called_user_id, &session).await?;
1217    Ok(())
1218}
1219
1220async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1221    let room_id = RoomId::from_proto(message.room_id);
1222    {
1223        let room = session
1224            .db()
1225            .await
1226            .decline_call(Some(room_id), session.user_id)
1227            .await?
1228            .ok_or_else(|| anyhow!("failed to decline call"))?;
1229        room_updated(&room, &session.peer);
1230    }
1231
1232    for connection_id in session
1233        .connection_pool()
1234        .await
1235        .user_connection_ids(session.user_id)
1236    {
1237        session
1238            .peer
1239            .send(
1240                connection_id,
1241                proto::CallCanceled {
1242                    room_id: room_id.to_proto(),
1243                },
1244            )
1245            .trace_err();
1246    }
1247    update_user_contacts(session.user_id, &session).await?;
1248    Ok(())
1249}
1250
1251async fn update_participant_location(
1252    request: proto::UpdateParticipantLocation,
1253    response: Response<proto::UpdateParticipantLocation>,
1254    session: Session,
1255) -> Result<()> {
1256    let room_id = RoomId::from_proto(request.room_id);
1257    let location = request
1258        .location
1259        .ok_or_else(|| anyhow!("invalid location"))?;
1260    let room = session
1261        .db()
1262        .await
1263        .update_room_participant_location(room_id, session.connection_id, location)
1264        .await?;
1265    room_updated(&room, &session.peer);
1266    response.send(proto::Ack {})?;
1267    Ok(())
1268}
1269
1270async fn share_project(
1271    request: proto::ShareProject,
1272    response: Response<proto::ShareProject>,
1273    session: Session,
1274) -> Result<()> {
1275    let (project_id, room) = &*session
1276        .db()
1277        .await
1278        .share_project(
1279            RoomId::from_proto(request.room_id),
1280            session.connection_id,
1281            &request.worktrees,
1282        )
1283        .await?;
1284    response.send(proto::ShareProjectResponse {
1285        project_id: project_id.to_proto(),
1286    })?;
1287    room_updated(&room, &session.peer);
1288
1289    Ok(())
1290}
1291
1292async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1293    let project_id = ProjectId::from_proto(message.project_id);
1294
1295    let (room, guest_connection_ids) = &*session
1296        .db()
1297        .await
1298        .unshare_project(project_id, session.connection_id)
1299        .await?;
1300
1301    broadcast(
1302        Some(session.connection_id),
1303        guest_connection_ids.iter().copied(),
1304        |conn_id| session.peer.send(conn_id, message.clone()),
1305    );
1306    room_updated(&room, &session.peer);
1307
1308    Ok(())
1309}
1310
1311async fn join_project(
1312    request: proto::JoinProject,
1313    response: Response<proto::JoinProject>,
1314    session: Session,
1315) -> Result<()> {
1316    let project_id = ProjectId::from_proto(request.project_id);
1317    let guest_user_id = session.user_id;
1318
1319    tracing::info!(%project_id, "join project");
1320
1321    let (project, replica_id) = &mut *session
1322        .db()
1323        .await
1324        .join_project(project_id, session.connection_id)
1325        .await?;
1326
1327    let collaborators = project
1328        .collaborators
1329        .iter()
1330        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1331        .map(|collaborator| collaborator.to_proto())
1332        .collect::<Vec<_>>();
1333
1334    let worktrees = project
1335        .worktrees
1336        .iter()
1337        .map(|(id, worktree)| proto::WorktreeMetadata {
1338            id: *id,
1339            root_name: worktree.root_name.clone(),
1340            visible: worktree.visible,
1341            abs_path: worktree.abs_path.clone(),
1342        })
1343        .collect::<Vec<_>>();
1344
1345    for collaborator in &collaborators {
1346        session
1347            .peer
1348            .send(
1349                collaborator.peer_id.unwrap().into(),
1350                proto::AddProjectCollaborator {
1351                    project_id: project_id.to_proto(),
1352                    collaborator: Some(proto::Collaborator {
1353                        peer_id: Some(session.connection_id.into()),
1354                        replica_id: replica_id.0 as u32,
1355                        user_id: guest_user_id.to_proto(),
1356                    }),
1357                },
1358            )
1359            .trace_err();
1360    }
1361
1362    // First, we send the metadata associated with each worktree.
1363    response.send(proto::JoinProjectResponse {
1364        worktrees: worktrees.clone(),
1365        replica_id: replica_id.0 as u32,
1366        collaborators: collaborators.clone(),
1367        language_servers: project.language_servers.clone(),
1368    })?;
1369
1370    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1371        #[cfg(any(test, feature = "test-support"))]
1372        const MAX_CHUNK_SIZE: usize = 2;
1373        #[cfg(not(any(test, feature = "test-support")))]
1374        const MAX_CHUNK_SIZE: usize = 256;
1375
1376        // Stream this worktree's entries.
1377        let message = proto::UpdateWorktree {
1378            project_id: project_id.to_proto(),
1379            worktree_id,
1380            abs_path: worktree.abs_path.clone(),
1381            root_name: worktree.root_name,
1382            updated_entries: worktree.entries,
1383            removed_entries: Default::default(),
1384            scan_id: worktree.scan_id,
1385            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1386        };
1387        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1388            session.peer.send(session.connection_id, update.clone())?;
1389        }
1390
1391        // Stream this worktree's diagnostics.
1392        for summary in worktree.diagnostic_summaries {
1393            session.peer.send(
1394                session.connection_id,
1395                proto::UpdateDiagnosticSummary {
1396                    project_id: project_id.to_proto(),
1397                    worktree_id: worktree.id,
1398                    summary: Some(summary),
1399                },
1400            )?;
1401        }
1402    }
1403
1404    for language_server in &project.language_servers {
1405        session.peer.send(
1406            session.connection_id,
1407            proto::UpdateLanguageServer {
1408                project_id: project_id.to_proto(),
1409                language_server_id: language_server.id,
1410                variant: Some(
1411                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1412                        proto::LspDiskBasedDiagnosticsUpdated {},
1413                    ),
1414                ),
1415            },
1416        )?;
1417    }
1418
1419    Ok(())
1420}
1421
1422async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1423    let sender_id = session.connection_id;
1424    let project_id = ProjectId::from_proto(request.project_id);
1425
1426    let (room, project) = &*session
1427        .db()
1428        .await
1429        .leave_project(project_id, sender_id)
1430        .await?;
1431    tracing::info!(
1432        %project_id,
1433        host_user_id = %project.host_user_id,
1434        host_connection_id = %project.host_connection_id,
1435        "leave project"
1436    );
1437
1438    project_left(&project, &session);
1439    room_updated(&room, &session.peer);
1440
1441    Ok(())
1442}
1443
1444async fn update_project(
1445    request: proto::UpdateProject,
1446    response: Response<proto::UpdateProject>,
1447    session: Session,
1448) -> Result<()> {
1449    let project_id = ProjectId::from_proto(request.project_id);
1450    let (room, guest_connection_ids) = &*session
1451        .db()
1452        .await
1453        .update_project(project_id, session.connection_id, &request.worktrees)
1454        .await?;
1455    broadcast(
1456        Some(session.connection_id),
1457        guest_connection_ids.iter().copied(),
1458        |connection_id| {
1459            session
1460                .peer
1461                .forward_send(session.connection_id, connection_id, request.clone())
1462        },
1463    );
1464    room_updated(&room, &session.peer);
1465    response.send(proto::Ack {})?;
1466
1467    Ok(())
1468}
1469
1470async fn update_worktree(
1471    request: proto::UpdateWorktree,
1472    response: Response<proto::UpdateWorktree>,
1473    session: Session,
1474) -> Result<()> {
1475    let guest_connection_ids = session
1476        .db()
1477        .await
1478        .update_worktree(&request, session.connection_id)
1479        .await?;
1480
1481    broadcast(
1482        Some(session.connection_id),
1483        guest_connection_ids.iter().copied(),
1484        |connection_id| {
1485            session
1486                .peer
1487                .forward_send(session.connection_id, connection_id, request.clone())
1488        },
1489    );
1490    response.send(proto::Ack {})?;
1491    Ok(())
1492}
1493
1494async fn update_diagnostic_summary(
1495    message: proto::UpdateDiagnosticSummary,
1496    session: Session,
1497) -> Result<()> {
1498    let guest_connection_ids = session
1499        .db()
1500        .await
1501        .update_diagnostic_summary(&message, session.connection_id)
1502        .await?;
1503
1504    broadcast(
1505        Some(session.connection_id),
1506        guest_connection_ids.iter().copied(),
1507        |connection_id| {
1508            session
1509                .peer
1510                .forward_send(session.connection_id, connection_id, message.clone())
1511        },
1512    );
1513
1514    Ok(())
1515}
1516
1517async fn start_language_server(
1518    request: proto::StartLanguageServer,
1519    session: Session,
1520) -> Result<()> {
1521    let guest_connection_ids = session
1522        .db()
1523        .await
1524        .start_language_server(&request, session.connection_id)
1525        .await?;
1526
1527    broadcast(
1528        Some(session.connection_id),
1529        guest_connection_ids.iter().copied(),
1530        |connection_id| {
1531            session
1532                .peer
1533                .forward_send(session.connection_id, connection_id, request.clone())
1534        },
1535    );
1536    Ok(())
1537}
1538
1539async fn update_language_server(
1540    request: proto::UpdateLanguageServer,
1541    session: Session,
1542) -> Result<()> {
1543    session.executor.record_backtrace();
1544    let project_id = ProjectId::from_proto(request.project_id);
1545    let project_connection_ids = session
1546        .db()
1547        .await
1548        .project_connection_ids(project_id, session.connection_id)
1549        .await?;
1550    broadcast(
1551        Some(session.connection_id),
1552        project_connection_ids.iter().copied(),
1553        |connection_id| {
1554            session
1555                .peer
1556                .forward_send(session.connection_id, connection_id, request.clone())
1557        },
1558    );
1559    Ok(())
1560}
1561
1562async fn forward_project_request<T>(
1563    request: T,
1564    response: Response<T>,
1565    session: Session,
1566) -> Result<()>
1567where
1568    T: EntityMessage + RequestMessage,
1569{
1570    session.executor.record_backtrace();
1571    let project_id = ProjectId::from_proto(request.remote_entity_id());
1572    let host_connection_id = {
1573        let collaborators = session
1574            .db()
1575            .await
1576            .project_collaborators(project_id, session.connection_id)
1577            .await?;
1578        collaborators
1579            .iter()
1580            .find(|collaborator| collaborator.is_host)
1581            .ok_or_else(|| anyhow!("host not found"))?
1582            .connection_id
1583    };
1584
1585    let payload = session
1586        .peer
1587        .forward_request(session.connection_id, host_connection_id, request)
1588        .await?;
1589
1590    response.send(payload)?;
1591    Ok(())
1592}
1593
1594async fn create_buffer_for_peer(
1595    request: proto::CreateBufferForPeer,
1596    session: Session,
1597) -> Result<()> {
1598    session.executor.record_backtrace();
1599    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1600    session
1601        .peer
1602        .forward_send(session.connection_id, peer_id.into(), request)?;
1603    Ok(())
1604}
1605
1606async fn update_buffer(
1607    request: proto::UpdateBuffer,
1608    response: Response<proto::UpdateBuffer>,
1609    session: Session,
1610) -> Result<()> {
1611    session.executor.record_backtrace();
1612    let project_id = ProjectId::from_proto(request.project_id);
1613    let mut guest_connection_ids;
1614    let mut host_connection_id = None;
1615    {
1616        let collaborators = session
1617            .db()
1618            .await
1619            .project_collaborators(project_id, session.connection_id)
1620            .await?;
1621        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1622        for collaborator in collaborators.iter() {
1623            if collaborator.is_host {
1624                host_connection_id = Some(collaborator.connection_id);
1625            } else {
1626                guest_connection_ids.push(collaborator.connection_id);
1627            }
1628        }
1629    }
1630    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1631
1632    session.executor.record_backtrace();
1633    broadcast(
1634        Some(session.connection_id),
1635        guest_connection_ids,
1636        |connection_id| {
1637            session
1638                .peer
1639                .forward_send(session.connection_id, connection_id, request.clone())
1640        },
1641    );
1642    if host_connection_id != session.connection_id {
1643        session
1644            .peer
1645            .forward_request(session.connection_id, host_connection_id, request.clone())
1646            .await?;
1647    }
1648
1649    response.send(proto::Ack {})?;
1650    Ok(())
1651}
1652
1653async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1654    let project_id = ProjectId::from_proto(request.project_id);
1655    let project_connection_ids = session
1656        .db()
1657        .await
1658        .project_connection_ids(project_id, session.connection_id)
1659        .await?;
1660
1661    broadcast(
1662        Some(session.connection_id),
1663        project_connection_ids.iter().copied(),
1664        |connection_id| {
1665            session
1666                .peer
1667                .forward_send(session.connection_id, connection_id, request.clone())
1668        },
1669    );
1670    Ok(())
1671}
1672
1673async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1674    let project_id = ProjectId::from_proto(request.project_id);
1675    let project_connection_ids = session
1676        .db()
1677        .await
1678        .project_connection_ids(project_id, session.connection_id)
1679        .await?;
1680    broadcast(
1681        Some(session.connection_id),
1682        project_connection_ids.iter().copied(),
1683        |connection_id| {
1684            session
1685                .peer
1686                .forward_send(session.connection_id, connection_id, request.clone())
1687        },
1688    );
1689    Ok(())
1690}
1691
1692async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1693    let project_id = ProjectId::from_proto(request.project_id);
1694    let project_connection_ids = session
1695        .db()
1696        .await
1697        .project_connection_ids(project_id, session.connection_id)
1698        .await?;
1699    broadcast(
1700        Some(session.connection_id),
1701        project_connection_ids.iter().copied(),
1702        |connection_id| {
1703            session
1704                .peer
1705                .forward_send(session.connection_id, connection_id, request.clone())
1706        },
1707    );
1708    Ok(())
1709}
1710
1711async fn follow(
1712    request: proto::Follow,
1713    response: Response<proto::Follow>,
1714    session: Session,
1715) -> Result<()> {
1716    let project_id = ProjectId::from_proto(request.project_id);
1717    let leader_id = request
1718        .leader_id
1719        .ok_or_else(|| anyhow!("invalid leader id"))?
1720        .into();
1721    let follower_id = session.connection_id;
1722
1723    {
1724        let project_connection_ids = session
1725            .db()
1726            .await
1727            .project_connection_ids(project_id, session.connection_id)
1728            .await?;
1729
1730        if !project_connection_ids.contains(&leader_id) {
1731            Err(anyhow!("no such peer"))?;
1732        }
1733    }
1734
1735    let mut response_payload = session
1736        .peer
1737        .forward_request(session.connection_id, leader_id, request)
1738        .await?;
1739    response_payload
1740        .views
1741        .retain(|view| view.leader_id != Some(follower_id.into()));
1742    response.send(response_payload)?;
1743
1744    let room = session
1745        .db()
1746        .await
1747        .follow(project_id, leader_id, follower_id)
1748        .await?;
1749    room_updated(&room, &session.peer);
1750
1751    Ok(())
1752}
1753
1754async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1755    let project_id = ProjectId::from_proto(request.project_id);
1756    let leader_id = request
1757        .leader_id
1758        .ok_or_else(|| anyhow!("invalid leader id"))?
1759        .into();
1760    let follower_id = session.connection_id;
1761
1762    if !session
1763        .db()
1764        .await
1765        .project_connection_ids(project_id, session.connection_id)
1766        .await?
1767        .contains(&leader_id)
1768    {
1769        Err(anyhow!("no such peer"))?;
1770    }
1771
1772    session
1773        .peer
1774        .forward_send(session.connection_id, leader_id, request)?;
1775
1776    let room = session
1777        .db()
1778        .await
1779        .unfollow(project_id, leader_id, follower_id)
1780        .await?;
1781    room_updated(&room, &session.peer);
1782
1783    Ok(())
1784}
1785
1786async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1787    let project_id = ProjectId::from_proto(request.project_id);
1788    let project_connection_ids = session
1789        .db
1790        .lock()
1791        .await
1792        .project_connection_ids(project_id, session.connection_id)
1793        .await?;
1794
1795    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1796        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1797        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1798        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1799    });
1800    for follower_peer_id in request.follower_ids.iter().copied() {
1801        let follower_connection_id = follower_peer_id.into();
1802        if project_connection_ids.contains(&follower_connection_id)
1803            && Some(follower_peer_id) != leader_id
1804        {
1805            session.peer.forward_send(
1806                session.connection_id,
1807                follower_connection_id,
1808                request.clone(),
1809            )?;
1810        }
1811    }
1812    Ok(())
1813}
1814
1815async fn get_users(
1816    request: proto::GetUsers,
1817    response: Response<proto::GetUsers>,
1818    session: Session,
1819) -> Result<()> {
1820    let user_ids = request
1821        .user_ids
1822        .into_iter()
1823        .map(UserId::from_proto)
1824        .collect();
1825    let users = session
1826        .db()
1827        .await
1828        .get_users_by_ids(user_ids)
1829        .await?
1830        .into_iter()
1831        .map(|user| proto::User {
1832            id: user.id.to_proto(),
1833            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1834            github_login: user.github_login,
1835        })
1836        .collect();
1837    response.send(proto::UsersResponse { users })?;
1838    Ok(())
1839}
1840
1841async fn fuzzy_search_users(
1842    request: proto::FuzzySearchUsers,
1843    response: Response<proto::FuzzySearchUsers>,
1844    session: Session,
1845) -> Result<()> {
1846    let query = request.query;
1847    let users = match query.len() {
1848        0 => vec![],
1849        1 | 2 => session
1850            .db()
1851            .await
1852            .get_user_by_github_login(&query)
1853            .await?
1854            .into_iter()
1855            .collect(),
1856        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
1857    };
1858    let users = users
1859        .into_iter()
1860        .filter(|user| user.id != session.user_id)
1861        .map(|user| proto::User {
1862            id: user.id.to_proto(),
1863            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1864            github_login: user.github_login,
1865        })
1866        .collect();
1867    response.send(proto::UsersResponse { users })?;
1868    Ok(())
1869}
1870
1871async fn request_contact(
1872    request: proto::RequestContact,
1873    response: Response<proto::RequestContact>,
1874    session: Session,
1875) -> Result<()> {
1876    let requester_id = session.user_id;
1877    let responder_id = UserId::from_proto(request.responder_id);
1878    if requester_id == responder_id {
1879        return Err(anyhow!("cannot add yourself as a contact"))?;
1880    }
1881
1882    session
1883        .db()
1884        .await
1885        .send_contact_request(requester_id, responder_id)
1886        .await?;
1887
1888    // Update outgoing contact requests of requester
1889    let mut update = proto::UpdateContacts::default();
1890    update.outgoing_requests.push(responder_id.to_proto());
1891    for connection_id in session
1892        .connection_pool()
1893        .await
1894        .user_connection_ids(requester_id)
1895    {
1896        session.peer.send(connection_id, update.clone())?;
1897    }
1898
1899    // Update incoming contact requests of responder
1900    let mut update = proto::UpdateContacts::default();
1901    update
1902        .incoming_requests
1903        .push(proto::IncomingContactRequest {
1904            requester_id: requester_id.to_proto(),
1905            should_notify: true,
1906        });
1907    for connection_id in session
1908        .connection_pool()
1909        .await
1910        .user_connection_ids(responder_id)
1911    {
1912        session.peer.send(connection_id, update.clone())?;
1913    }
1914
1915    response.send(proto::Ack {})?;
1916    Ok(())
1917}
1918
1919async fn respond_to_contact_request(
1920    request: proto::RespondToContactRequest,
1921    response: Response<proto::RespondToContactRequest>,
1922    session: Session,
1923) -> Result<()> {
1924    let responder_id = session.user_id;
1925    let requester_id = UserId::from_proto(request.requester_id);
1926    let db = session.db().await;
1927    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
1928        db.dismiss_contact_notification(responder_id, requester_id)
1929            .await?;
1930    } else {
1931        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
1932
1933        db.respond_to_contact_request(responder_id, requester_id, accept)
1934            .await?;
1935        let requester_busy = db.is_user_busy(requester_id).await?;
1936        let responder_busy = db.is_user_busy(responder_id).await?;
1937
1938        let pool = session.connection_pool().await;
1939        // Update responder with new contact
1940        let mut update = proto::UpdateContacts::default();
1941        if accept {
1942            update
1943                .contacts
1944                .push(contact_for_user(requester_id, false, requester_busy, &pool));
1945        }
1946        update
1947            .remove_incoming_requests
1948            .push(requester_id.to_proto());
1949        for connection_id in pool.user_connection_ids(responder_id) {
1950            session.peer.send(connection_id, update.clone())?;
1951        }
1952
1953        // Update requester with new contact
1954        let mut update = proto::UpdateContacts::default();
1955        if accept {
1956            update
1957                .contacts
1958                .push(contact_for_user(responder_id, true, responder_busy, &pool));
1959        }
1960        update
1961            .remove_outgoing_requests
1962            .push(responder_id.to_proto());
1963        for connection_id in pool.user_connection_ids(requester_id) {
1964            session.peer.send(connection_id, update.clone())?;
1965        }
1966    }
1967
1968    response.send(proto::Ack {})?;
1969    Ok(())
1970}
1971
1972async fn remove_contact(
1973    request: proto::RemoveContact,
1974    response: Response<proto::RemoveContact>,
1975    session: Session,
1976) -> Result<()> {
1977    let requester_id = session.user_id;
1978    let responder_id = UserId::from_proto(request.user_id);
1979    let db = session.db().await;
1980    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
1981
1982    let pool = session.connection_pool().await;
1983    // Update outgoing contact requests of requester
1984    let mut update = proto::UpdateContacts::default();
1985    if contact_accepted {
1986        update.remove_contacts.push(responder_id.to_proto());
1987    } else {
1988        update
1989            .remove_outgoing_requests
1990            .push(responder_id.to_proto());
1991    }
1992    for connection_id in pool.user_connection_ids(requester_id) {
1993        session.peer.send(connection_id, update.clone())?;
1994    }
1995
1996    // Update incoming contact requests of responder
1997    let mut update = proto::UpdateContacts::default();
1998    if contact_accepted {
1999        update.remove_contacts.push(requester_id.to_proto());
2000    } else {
2001        update
2002            .remove_incoming_requests
2003            .push(requester_id.to_proto());
2004    }
2005    for connection_id in pool.user_connection_ids(responder_id) {
2006        session.peer.send(connection_id, update.clone())?;
2007    }
2008
2009    response.send(proto::Ack {})?;
2010    Ok(())
2011}
2012
2013async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2014    let project_id = ProjectId::from_proto(request.project_id);
2015    let project_connection_ids = session
2016        .db()
2017        .await
2018        .project_connection_ids(project_id, session.connection_id)
2019        .await?;
2020    broadcast(
2021        Some(session.connection_id),
2022        project_connection_ids.iter().copied(),
2023        |connection_id| {
2024            session
2025                .peer
2026                .forward_send(session.connection_id, connection_id, request.clone())
2027        },
2028    );
2029    Ok(())
2030}
2031
2032async fn get_private_user_info(
2033    _request: proto::GetPrivateUserInfo,
2034    response: Response<proto::GetPrivateUserInfo>,
2035    session: Session,
2036) -> Result<()> {
2037    let metrics_id = session
2038        .db()
2039        .await
2040        .get_user_metrics_id(session.user_id)
2041        .await?;
2042    let user = session
2043        .db()
2044        .await
2045        .get_user_by_id(session.user_id)
2046        .await?
2047        .ok_or_else(|| anyhow!("user not found"))?;
2048    response.send(proto::GetPrivateUserInfoResponse {
2049        metrics_id,
2050        staff: user.admin,
2051    })?;
2052    Ok(())
2053}
2054
2055fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2056    match message {
2057        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2058        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2059        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2060        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2061        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2062            code: frame.code.into(),
2063            reason: frame.reason,
2064        })),
2065    }
2066}
2067
2068fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2069    match message {
2070        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2071        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2072        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2073        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2074        AxumMessage::Close(frame) => {
2075            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2076                code: frame.code.into(),
2077                reason: frame.reason,
2078            }))
2079        }
2080    }
2081}
2082
2083fn build_initial_contacts_update(
2084    contacts: Vec<db::Contact>,
2085    pool: &ConnectionPool,
2086) -> proto::UpdateContacts {
2087    let mut update = proto::UpdateContacts::default();
2088
2089    for contact in contacts {
2090        match contact {
2091            db::Contact::Accepted {
2092                user_id,
2093                should_notify,
2094                busy,
2095            } => {
2096                update
2097                    .contacts
2098                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2099            }
2100            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2101            db::Contact::Incoming {
2102                user_id,
2103                should_notify,
2104            } => update
2105                .incoming_requests
2106                .push(proto::IncomingContactRequest {
2107                    requester_id: user_id.to_proto(),
2108                    should_notify,
2109                }),
2110        }
2111    }
2112
2113    update
2114}
2115
2116fn contact_for_user(
2117    user_id: UserId,
2118    should_notify: bool,
2119    busy: bool,
2120    pool: &ConnectionPool,
2121) -> proto::Contact {
2122    proto::Contact {
2123        user_id: user_id.to_proto(),
2124        online: pool.is_user_online(user_id),
2125        busy,
2126        should_notify,
2127    }
2128}
2129
2130fn room_updated(room: &proto::Room, peer: &Peer) {
2131    broadcast(
2132        None,
2133        room.participants
2134            .iter()
2135            .filter_map(|participant| Some(participant.peer_id?.into())),
2136        |peer_id| {
2137            peer.send(
2138                peer_id.into(),
2139                proto::RoomUpdated {
2140                    room: Some(room.clone()),
2141                },
2142            )
2143        },
2144    );
2145}
2146
2147async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2148    let db = session.db().await;
2149    let contacts = db.get_contacts(user_id).await?;
2150    let busy = db.is_user_busy(user_id).await?;
2151
2152    let pool = session.connection_pool().await;
2153    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2154    for contact in contacts {
2155        if let db::Contact::Accepted {
2156            user_id: contact_user_id,
2157            ..
2158        } = contact
2159        {
2160            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2161                session
2162                    .peer
2163                    .send(
2164                        contact_conn_id,
2165                        proto::UpdateContacts {
2166                            contacts: vec![updated_contact.clone()],
2167                            remove_contacts: Default::default(),
2168                            incoming_requests: Default::default(),
2169                            remove_incoming_requests: Default::default(),
2170                            outgoing_requests: Default::default(),
2171                            remove_outgoing_requests: Default::default(),
2172                        },
2173                    )
2174                    .trace_err();
2175            }
2176        }
2177    }
2178    Ok(())
2179}
2180
2181async fn leave_room_for_session(session: &Session) -> Result<()> {
2182    let mut contacts_to_update = HashSet::default();
2183
2184    let room_id;
2185    let canceled_calls_to_user_ids;
2186    let live_kit_room;
2187    let delete_live_kit_room;
2188    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
2189        contacts_to_update.insert(session.user_id);
2190
2191        for project in left_room.left_projects.values() {
2192            project_left(project, session);
2193        }
2194
2195        room_updated(&left_room.room, &session.peer);
2196        room_id = RoomId::from_proto(left_room.room.id);
2197        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
2198        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
2199        delete_live_kit_room = left_room.room.participants.is_empty();
2200    } else {
2201        return Ok(());
2202    }
2203
2204    {
2205        let pool = session.connection_pool().await;
2206        for canceled_user_id in canceled_calls_to_user_ids {
2207            for connection_id in pool.user_connection_ids(canceled_user_id) {
2208                session
2209                    .peer
2210                    .send(
2211                        connection_id,
2212                        proto::CallCanceled {
2213                            room_id: room_id.to_proto(),
2214                        },
2215                    )
2216                    .trace_err();
2217            }
2218            contacts_to_update.insert(canceled_user_id);
2219        }
2220    }
2221
2222    for contact_user_id in contacts_to_update {
2223        update_user_contacts(contact_user_id, &session).await?;
2224    }
2225
2226    if let Some(live_kit) = session.live_kit_client.as_ref() {
2227        live_kit
2228            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
2229            .await
2230            .trace_err();
2231
2232        if delete_live_kit_room {
2233            live_kit.delete_room(live_kit_room).await.trace_err();
2234        }
2235    }
2236
2237    Ok(())
2238}
2239
2240fn project_left(project: &db::LeftProject, session: &Session) {
2241    for connection_id in &project.connection_ids {
2242        if project.host_user_id == session.user_id {
2243            session
2244                .peer
2245                .send(
2246                    *connection_id,
2247                    proto::UnshareProject {
2248                        project_id: project.id.to_proto(),
2249                    },
2250                )
2251                .trace_err();
2252        } else {
2253            session
2254                .peer
2255                .send(
2256                    *connection_id,
2257                    proto::RemoveProjectCollaborator {
2258                        project_id: project.id.to_proto(),
2259                        peer_id: Some(session.connection_id.into()),
2260                    },
2261                )
2262                .trace_err();
2263        }
2264    }
2265}
2266
2267pub trait ResultExt {
2268    type Ok;
2269
2270    fn trace_err(self) -> Option<Self::Ok>;
2271}
2272
2273impl<T, E> ResultExt for Result<T, E>
2274where
2275    E: std::fmt::Debug,
2276{
2277    type Ok = T;
2278
2279    fn trace_err(self) -> Option<T> {
2280        match self {
2281            Ok(value) => Some(value),
2282            Err(error) => {
2283                tracing::error!("{:?}", error);
2284                None
2285            }
2286        }
2287    }
2288}