rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{
   6        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
   7        UserId,
   8    },
   9    executor::Executor,
  10    AppState, Result,
  11};
  12use anyhow::anyhow;
  13use async_tungstenite::tungstenite::{
  14    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  15};
  16use axum::{
  17    body::Body,
  18    extract::{
  19        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  20        ConnectInfo, WebSocketUpgrade,
  21    },
  22    headers::{Header, HeaderName},
  23    http::StatusCode,
  24    middleware,
  25    response::IntoResponse,
  26    routing::get,
  27    Extension, Router, TypedHeader,
  28};
  29use collections::{HashMap, HashSet};
  30pub use connection_pool::ConnectionPool;
  31use futures::{
  32    channel::oneshot,
  33    future::{self, BoxFuture},
  34    stream::FuturesUnordered,
  35    FutureExt, SinkExt, StreamExt, TryStreamExt,
  36};
  37use lazy_static::lazy_static;
  38use prometheus::{register_int_gauge, IntGauge};
  39use rpc::{
  40    proto::{
  41        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
  42        LiveKitConnectionInfo, RequestMessage,
  43    },
  44    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  45};
  46use serde::{Serialize, Serializer};
  47use std::{
  48    any::TypeId,
  49    fmt,
  50    future::Future,
  51    marker::PhantomData,
  52    mem,
  53    net::SocketAddr,
  54    ops::{Deref, DerefMut},
  55    rc::Rc,
  56    sync::{
  57        atomic::{AtomicBool, Ordering::SeqCst},
  58        Arc,
  59    },
  60    time::{Duration, Instant},
  61};
  62use time::OffsetDateTime;
  63use tokio::sync::{watch, Semaphore};
  64use tower::ServiceBuilder;
  65use tracing::{info_span, instrument, Instrument};
  66
  67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  69
  70const MESSAGE_COUNT_PER_PAGE: usize = 100;
  71const MAX_MESSAGE_LEN: usize = 1024;
  72
  73lazy_static! {
  74    static ref METRIC_CONNECTIONS: IntGauge =
  75        register_int_gauge!("connections", "number of connections").unwrap();
  76    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  77        "shared_projects",
  78        "number of open projects with one or more guests"
  79    )
  80    .unwrap();
  81}
  82
  83type MessageHandler =
  84    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  85
  86struct Response<R> {
  87    peer: Arc<Peer>,
  88    receipt: Receipt<R>,
  89    responded: Arc<AtomicBool>,
  90}
  91
  92impl<R: RequestMessage> Response<R> {
  93    fn send(self, payload: R::Response) -> Result<()> {
  94        self.responded.store(true, SeqCst);
  95        self.peer.respond(self.receipt, payload)?;
  96        Ok(())
  97    }
  98}
  99
 100#[derive(Clone)]
 101struct Session {
 102    user_id: UserId,
 103    connection_id: ConnectionId,
 104    db: Arc<tokio::sync::Mutex<DbHandle>>,
 105    peer: Arc<Peer>,
 106    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 107    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 108    executor: Executor,
 109}
 110
 111impl Session {
 112    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 113        #[cfg(test)]
 114        tokio::task::yield_now().await;
 115        let guard = self.db.lock().await;
 116        #[cfg(test)]
 117        tokio::task::yield_now().await;
 118        guard
 119    }
 120
 121    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 122        #[cfg(test)]
 123        tokio::task::yield_now().await;
 124        let guard = self.connection_pool.lock();
 125        ConnectionPoolGuard {
 126            guard,
 127            _not_send: PhantomData,
 128        }
 129    }
 130}
 131
 132impl fmt::Debug for Session {
 133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 134        f.debug_struct("Session")
 135            .field("user_id", &self.user_id)
 136            .field("connection_id", &self.connection_id)
 137            .finish()
 138    }
 139}
 140
 141struct DbHandle(Arc<Database>);
 142
 143impl Deref for DbHandle {
 144    type Target = Database;
 145
 146    fn deref(&self) -> &Self::Target {
 147        self.0.as_ref()
 148    }
 149}
 150
 151pub struct Server {
 152    id: parking_lot::Mutex<ServerId>,
 153    peer: Arc<Peer>,
 154    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 155    app_state: Arc<AppState>,
 156    executor: Executor,
 157    handlers: HashMap<TypeId, MessageHandler>,
 158    teardown: watch::Sender<()>,
 159}
 160
 161pub(crate) struct ConnectionPoolGuard<'a> {
 162    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 163    _not_send: PhantomData<Rc<()>>,
 164}
 165
 166#[derive(Serialize)]
 167pub struct ServerSnapshot<'a> {
 168    peer: &'a Peer,
 169    #[serde(serialize_with = "serialize_deref")]
 170    connection_pool: ConnectionPoolGuard<'a>,
 171}
 172
 173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 174where
 175    S: Serializer,
 176    T: Deref<Target = U>,
 177    U: Serialize,
 178{
 179    Serialize::serialize(value.deref(), serializer)
 180}
 181
 182impl Server {
 183    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 184        let mut server = Self {
 185            id: parking_lot::Mutex::new(id),
 186            peer: Peer::new(id.0 as u32),
 187            app_state,
 188            executor,
 189            connection_pool: Default::default(),
 190            handlers: Default::default(),
 191            teardown: watch::channel(()).0,
 192        };
 193
 194        server
 195            .add_request_handler(ping)
 196            .add_request_handler(create_room)
 197            .add_request_handler(join_room)
 198            .add_request_handler(rejoin_room)
 199            .add_request_handler(leave_room)
 200            .add_request_handler(call)
 201            .add_request_handler(cancel_call)
 202            .add_message_handler(decline_call)
 203            .add_request_handler(update_participant_location)
 204            .add_request_handler(share_project)
 205            .add_message_handler(unshare_project)
 206            .add_request_handler(join_project)
 207            .add_message_handler(leave_project)
 208            .add_request_handler(update_project)
 209            .add_request_handler(update_worktree)
 210            .add_message_handler(start_language_server)
 211            .add_message_handler(update_language_server)
 212            .add_message_handler(update_diagnostic_summary)
 213            .add_message_handler(update_worktree_settings)
 214            .add_message_handler(refresh_inlay_hints)
 215            .add_request_handler(forward_project_request::<proto::GetHover>)
 216            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 217            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 218            .add_request_handler(forward_project_request::<proto::GetReferences>)
 219            .add_request_handler(forward_project_request::<proto::SearchProject>)
 220            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 221            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 222            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 223            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 224            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 225            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 226            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 227            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 228            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 229            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 230            .add_request_handler(forward_project_request::<proto::PerformRename>)
 231            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 232            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 233            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 234            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 235            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 236            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 237            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 238            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 239            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 240            .add_request_handler(forward_project_request::<proto::InlayHints>)
 241            .add_message_handler(create_buffer_for_peer)
 242            .add_request_handler(update_buffer)
 243            .add_message_handler(update_buffer_file)
 244            .add_message_handler(buffer_reloaded)
 245            .add_message_handler(buffer_saved)
 246            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 247            .add_request_handler(get_users)
 248            .add_request_handler(fuzzy_search_users)
 249            .add_request_handler(request_contact)
 250            .add_request_handler(remove_contact)
 251            .add_request_handler(respond_to_contact_request)
 252            .add_request_handler(create_channel)
 253            .add_request_handler(remove_channel)
 254            .add_request_handler(invite_channel_member)
 255            .add_request_handler(remove_channel_member)
 256            .add_request_handler(set_channel_member_admin)
 257            .add_request_handler(rename_channel)
 258            .add_request_handler(join_channel_buffer)
 259            .add_request_handler(leave_channel_buffer)
 260            .add_message_handler(update_channel_buffer)
 261            .add_request_handler(rejoin_channel_buffers)
 262            .add_request_handler(get_channel_members)
 263            .add_request_handler(respond_to_channel_invite)
 264            .add_request_handler(join_channel)
 265            .add_request_handler(join_channel_chat)
 266            .add_message_handler(leave_channel_chat)
 267            .add_request_handler(send_channel_message)
 268            .add_request_handler(remove_channel_message)
 269            .add_request_handler(get_channel_messages)
 270            .add_request_handler(follow)
 271            .add_message_handler(unfollow)
 272            .add_message_handler(update_followers)
 273            .add_message_handler(update_diff_base)
 274            .add_request_handler(get_private_user_info);
 275
 276        Arc::new(server)
 277    }
 278
 279    pub async fn start(&self) -> Result<()> {
 280        let server_id = *self.id.lock();
 281        let app_state = self.app_state.clone();
 282        let peer = self.peer.clone();
 283        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 284        let pool = self.connection_pool.clone();
 285        let live_kit_client = self.app_state.live_kit_client.clone();
 286
 287        let span = info_span!("start server");
 288        self.executor.spawn_detached(
 289            async move {
 290                tracing::info!("waiting for cleanup timeout");
 291                timeout.await;
 292                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 293                if let Some((room_ids, channel_ids)) = app_state
 294                    .db
 295                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
 296                    .await
 297                    .trace_err()
 298                {
 299                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 300                    tracing::info!(
 301                        stale_channel_buffer_count = channel_ids.len(),
 302                        "retrieved stale channel buffers"
 303                    );
 304
 305                    for channel_id in channel_ids {
 306                        if let Some(refreshed_channel_buffer) = app_state
 307                            .db
 308                            .clear_stale_channel_buffer_collaborators(channel_id, server_id)
 309                            .await
 310                            .trace_err()
 311                        {
 312                            for connection_id in refreshed_channel_buffer.connection_ids {
 313                                for message in &refreshed_channel_buffer.removed_collaborators {
 314                                    peer.send(connection_id, message.clone()).trace_err();
 315                                }
 316                            }
 317                        }
 318                    }
 319
 320                    for room_id in room_ids {
 321                        let mut contacts_to_update = HashSet::default();
 322                        let mut canceled_calls_to_user_ids = Vec::new();
 323                        let mut live_kit_room = String::new();
 324                        let mut delete_live_kit_room = false;
 325
 326                        if let Some(mut refreshed_room) = app_state
 327                            .db
 328                            .clear_stale_room_participants(room_id, server_id)
 329                            .await
 330                            .trace_err()
 331                        {
 332                            tracing::info!(
 333                                room_id = room_id.0,
 334                                new_participant_count = refreshed_room.room.participants.len(),
 335                                "refreshed room"
 336                            );
 337                            room_updated(&refreshed_room.room, &peer);
 338                            if let Some(channel_id) = refreshed_room.channel_id {
 339                                channel_updated(
 340                                    channel_id,
 341                                    &refreshed_room.room,
 342                                    &refreshed_room.channel_members,
 343                                    &peer,
 344                                    &*pool.lock(),
 345                                );
 346                            }
 347                            contacts_to_update
 348                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 349                            contacts_to_update
 350                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 351                            canceled_calls_to_user_ids =
 352                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 353                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 354                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 355                        }
 356
 357                        {
 358                            let pool = pool.lock();
 359                            for canceled_user_id in canceled_calls_to_user_ids {
 360                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 361                                    peer.send(
 362                                        connection_id,
 363                                        proto::CallCanceled {
 364                                            room_id: room_id.to_proto(),
 365                                        },
 366                                    )
 367                                    .trace_err();
 368                                }
 369                            }
 370                        }
 371
 372                        for user_id in contacts_to_update {
 373                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 374                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 375                            if let Some((busy, contacts)) = busy.zip(contacts) {
 376                                let pool = pool.lock();
 377                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 378                                for contact in contacts {
 379                                    if let db::Contact::Accepted {
 380                                        user_id: contact_user_id,
 381                                        ..
 382                                    } = contact
 383                                    {
 384                                        for contact_conn_id in
 385                                            pool.user_connection_ids(contact_user_id)
 386                                        {
 387                                            peer.send(
 388                                                contact_conn_id,
 389                                                proto::UpdateContacts {
 390                                                    contacts: vec![updated_contact.clone()],
 391                                                    remove_contacts: Default::default(),
 392                                                    incoming_requests: Default::default(),
 393                                                    remove_incoming_requests: Default::default(),
 394                                                    outgoing_requests: Default::default(),
 395                                                    remove_outgoing_requests: Default::default(),
 396                                                },
 397                                            )
 398                                            .trace_err();
 399                                        }
 400                                    }
 401                                }
 402                            }
 403                        }
 404
 405                        if let Some(live_kit) = live_kit_client.as_ref() {
 406                            if delete_live_kit_room {
 407                                live_kit.delete_room(live_kit_room).await.trace_err();
 408                            }
 409                        }
 410                    }
 411                }
 412
 413                app_state
 414                    .db
 415                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 416                    .await
 417                    .trace_err();
 418            }
 419            .instrument(span),
 420        );
 421        Ok(())
 422    }
 423
 424    pub fn teardown(&self) {
 425        self.peer.teardown();
 426        self.connection_pool.lock().reset();
 427        let _ = self.teardown.send(());
 428    }
 429
 430    #[cfg(test)]
 431    pub fn reset(&self, id: ServerId) {
 432        self.teardown();
 433        *self.id.lock() = id;
 434        self.peer.reset(id.0 as u32);
 435    }
 436
 437    #[cfg(test)]
 438    pub fn id(&self) -> ServerId {
 439        *self.id.lock()
 440    }
 441
 442    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 443    where
 444        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 445        Fut: 'static + Send + Future<Output = Result<()>>,
 446        M: EnvelopedMessage,
 447    {
 448        let prev_handler = self.handlers.insert(
 449            TypeId::of::<M>(),
 450            Box::new(move |envelope, session| {
 451                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 452                let span = info_span!(
 453                    "handle message",
 454                    payload_type = envelope.payload_type_name()
 455                );
 456                span.in_scope(|| {
 457                    tracing::info!(
 458                        payload_type = envelope.payload_type_name(),
 459                        "message received"
 460                    );
 461                });
 462                let start_time = Instant::now();
 463                let future = (handler)(*envelope, session);
 464                async move {
 465                    let result = future.await;
 466                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 467                    match result {
 468                        Err(error) => {
 469                            tracing::error!(%error, ?duration_ms, "error handling message")
 470                        }
 471                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 472                    }
 473                }
 474                .instrument(span)
 475                .boxed()
 476            }),
 477        );
 478        if prev_handler.is_some() {
 479            panic!("registered a handler for the same message twice");
 480        }
 481        self
 482    }
 483
 484    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 485    where
 486        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 487        Fut: 'static + Send + Future<Output = Result<()>>,
 488        M: EnvelopedMessage,
 489    {
 490        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 491        self
 492    }
 493
 494    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 495    where
 496        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 497        Fut: Send + Future<Output = Result<()>>,
 498        M: RequestMessage,
 499    {
 500        let handler = Arc::new(handler);
 501        self.add_handler(move |envelope, session| {
 502            let receipt = envelope.receipt();
 503            let handler = handler.clone();
 504            async move {
 505                let peer = session.peer.clone();
 506                let responded = Arc::new(AtomicBool::default());
 507                let response = Response {
 508                    peer: peer.clone(),
 509                    responded: responded.clone(),
 510                    receipt,
 511                };
 512                match (handler)(envelope.payload, response, session).await {
 513                    Ok(()) => {
 514                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 515                            Ok(())
 516                        } else {
 517                            Err(anyhow!("handler did not send a response"))?
 518                        }
 519                    }
 520                    Err(error) => {
 521                        peer.respond_with_error(
 522                            receipt,
 523                            proto::Error {
 524                                message: error.to_string(),
 525                            },
 526                        )?;
 527                        Err(error)
 528                    }
 529                }
 530            }
 531        })
 532    }
 533
 534    pub fn handle_connection(
 535        self: &Arc<Self>,
 536        connection: Connection,
 537        address: String,
 538        user: User,
 539        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 540        executor: Executor,
 541    ) -> impl Future<Output = Result<()>> {
 542        let this = self.clone();
 543        let user_id = user.id;
 544        let login = user.github_login;
 545        let span = info_span!("handle connection", %user_id, %login, %address);
 546        let mut teardown = self.teardown.subscribe();
 547        async move {
 548            let (connection_id, handle_io, mut incoming_rx) = this
 549                .peer
 550                .add_connection(connection, {
 551                    let executor = executor.clone();
 552                    move |duration| executor.sleep(duration)
 553                });
 554
 555            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 556            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 557            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 558
 559            if let Some(send_connection_id) = send_connection_id.take() {
 560                let _ = send_connection_id.send(connection_id);
 561            }
 562
 563            if !user.connected_once {
 564                this.peer.send(connection_id, proto::ShowContacts {})?;
 565                this.app_state.db.set_user_connected_once(user_id, true).await?;
 566            }
 567
 568            let (contacts, channels_for_user, channel_invites) = future::try_join3(
 569                this.app_state.db.get_contacts(user_id),
 570                this.app_state.db.get_channels_for_user(user_id),
 571                this.app_state.db.get_channel_invites_for_user(user_id)
 572            ).await?;
 573
 574            {
 575                let mut pool = this.connection_pool.lock();
 576                pool.add_connection(connection_id, user_id, user.admin);
 577                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 578                this.peer.send(connection_id, build_initial_channels_update(
 579                    channels_for_user,
 580                    channel_invites
 581                ))?;
 582            }
 583
 584            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 585                this.peer.send(connection_id, incoming_call)?;
 586            }
 587
 588            let session = Session {
 589                user_id,
 590                connection_id,
 591                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 592                peer: this.peer.clone(),
 593                connection_pool: this.connection_pool.clone(),
 594                live_kit_client: this.app_state.live_kit_client.clone(),
 595                executor: executor.clone(),
 596            };
 597            update_user_contacts(user_id, &session).await?;
 598
 599            let handle_io = handle_io.fuse();
 600            futures::pin_mut!(handle_io);
 601
 602            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 603            // This prevents deadlocks when e.g., client A performs a request to client B and
 604            // client B performs a request to client A. If both clients stop processing further
 605            // messages until their respective request completes, they won't have a chance to
 606            // respond to the other client's request and cause a deadlock.
 607            //
 608            // This arrangement ensures we will attempt to process earlier messages first, but fall
 609            // back to processing messages arrived later in the spirit of making progress.
 610            let mut foreground_message_handlers = FuturesUnordered::new();
 611            let concurrent_handlers = Arc::new(Semaphore::new(256));
 612            loop {
 613                let next_message = async {
 614                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 615                    let message = incoming_rx.next().await;
 616                    (permit, message)
 617                }.fuse();
 618                futures::pin_mut!(next_message);
 619                futures::select_biased! {
 620                    _ = teardown.changed().fuse() => return Ok(()),
 621                    result = handle_io => {
 622                        if let Err(error) = result {
 623                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 624                        }
 625                        break;
 626                    }
 627                    _ = foreground_message_handlers.next() => {}
 628                    next_message = next_message => {
 629                        let (permit, message) = next_message;
 630                        if let Some(message) = message {
 631                            let type_name = message.payload_type_name();
 632                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 633                            let span_enter = span.enter();
 634                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 635                                let is_background = message.is_background();
 636                                let handle_message = (handler)(message, session.clone());
 637                                drop(span_enter);
 638
 639                                let handle_message = async move {
 640                                    handle_message.await;
 641                                    drop(permit);
 642                                }.instrument(span);
 643                                if is_background {
 644                                    executor.spawn_detached(handle_message);
 645                                } else {
 646                                    foreground_message_handlers.push(handle_message);
 647                                }
 648                            } else {
 649                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 650                            }
 651                        } else {
 652                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 653                            break;
 654                        }
 655                    }
 656                }
 657            }
 658
 659            drop(foreground_message_handlers);
 660            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 661            if let Err(error) = connection_lost(session, teardown, executor).await {
 662                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 663            }
 664
 665            Ok(())
 666        }.instrument(span)
 667    }
 668
 669    pub async fn invite_code_redeemed(
 670        self: &Arc<Self>,
 671        inviter_id: UserId,
 672        invitee_id: UserId,
 673    ) -> Result<()> {
 674        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 675            if let Some(code) = &user.invite_code {
 676                let pool = self.connection_pool.lock();
 677                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 678                for connection_id in pool.user_connection_ids(inviter_id) {
 679                    self.peer.send(
 680                        connection_id,
 681                        proto::UpdateContacts {
 682                            contacts: vec![invitee_contact.clone()],
 683                            ..Default::default()
 684                        },
 685                    )?;
 686                    self.peer.send(
 687                        connection_id,
 688                        proto::UpdateInviteInfo {
 689                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 690                            count: user.invite_count as u32,
 691                        },
 692                    )?;
 693                }
 694            }
 695        }
 696        Ok(())
 697    }
 698
 699    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 700        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 701            if let Some(invite_code) = &user.invite_code {
 702                let pool = self.connection_pool.lock();
 703                for connection_id in pool.user_connection_ids(user_id) {
 704                    self.peer.send(
 705                        connection_id,
 706                        proto::UpdateInviteInfo {
 707                            url: format!(
 708                                "{}{}",
 709                                self.app_state.config.invite_link_prefix, invite_code
 710                            ),
 711                            count: user.invite_count as u32,
 712                        },
 713                    )?;
 714                }
 715            }
 716        }
 717        Ok(())
 718    }
 719
 720    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 721        ServerSnapshot {
 722            connection_pool: ConnectionPoolGuard {
 723                guard: self.connection_pool.lock(),
 724                _not_send: PhantomData,
 725            },
 726            peer: &self.peer,
 727        }
 728    }
 729}
 730
 731impl<'a> Deref for ConnectionPoolGuard<'a> {
 732    type Target = ConnectionPool;
 733
 734    fn deref(&self) -> &Self::Target {
 735        &*self.guard
 736    }
 737}
 738
 739impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 740    fn deref_mut(&mut self) -> &mut Self::Target {
 741        &mut *self.guard
 742    }
 743}
 744
 745impl<'a> Drop for ConnectionPoolGuard<'a> {
 746    fn drop(&mut self) {
 747        #[cfg(test)]
 748        self.check_invariants();
 749    }
 750}
 751
 752fn broadcast<F>(
 753    sender_id: Option<ConnectionId>,
 754    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 755    mut f: F,
 756) where
 757    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 758{
 759    for receiver_id in receiver_ids {
 760        if Some(receiver_id) != sender_id {
 761            if let Err(error) = f(receiver_id) {
 762                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 763            }
 764        }
 765    }
 766}
 767
 768lazy_static! {
 769    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 770}
 771
 772pub struct ProtocolVersion(u32);
 773
 774impl Header for ProtocolVersion {
 775    fn name() -> &'static HeaderName {
 776        &ZED_PROTOCOL_VERSION
 777    }
 778
 779    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 780    where
 781        Self: Sized,
 782        I: Iterator<Item = &'i axum::http::HeaderValue>,
 783    {
 784        let version = values
 785            .next()
 786            .ok_or_else(axum::headers::Error::invalid)?
 787            .to_str()
 788            .map_err(|_| axum::headers::Error::invalid())?
 789            .parse()
 790            .map_err(|_| axum::headers::Error::invalid())?;
 791        Ok(Self(version))
 792    }
 793
 794    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 795        values.extend([self.0.to_string().parse().unwrap()]);
 796    }
 797}
 798
 799pub fn routes(server: Arc<Server>) -> Router<Body> {
 800    Router::new()
 801        .route("/rpc", get(handle_websocket_request))
 802        .layer(
 803            ServiceBuilder::new()
 804                .layer(Extension(server.app_state.clone()))
 805                .layer(middleware::from_fn(auth::validate_header)),
 806        )
 807        .route("/metrics", get(handle_metrics))
 808        .layer(Extension(server))
 809}
 810
 811pub async fn handle_websocket_request(
 812    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 813    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 814    Extension(server): Extension<Arc<Server>>,
 815    Extension(user): Extension<User>,
 816    ws: WebSocketUpgrade,
 817) -> axum::response::Response {
 818    if protocol_version != rpc::PROTOCOL_VERSION {
 819        return (
 820            StatusCode::UPGRADE_REQUIRED,
 821            "client must be upgraded".to_string(),
 822        )
 823            .into_response();
 824    }
 825    let socket_address = socket_address.to_string();
 826    ws.on_upgrade(move |socket| {
 827        use util::ResultExt;
 828        let socket = socket
 829            .map_ok(to_tungstenite_message)
 830            .err_into()
 831            .with(|message| async move { Ok(to_axum_message(message)) });
 832        let connection = Connection::new(Box::pin(socket));
 833        async move {
 834            server
 835                .handle_connection(connection, socket_address, user, None, Executor::Production)
 836                .await
 837                .log_err();
 838        }
 839    })
 840}
 841
 842pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 843    let connections = server
 844        .connection_pool
 845        .lock()
 846        .connections()
 847        .filter(|connection| !connection.admin)
 848        .count();
 849
 850    METRIC_CONNECTIONS.set(connections as _);
 851
 852    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 853    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 854
 855    let encoder = prometheus::TextEncoder::new();
 856    let metric_families = prometheus::gather();
 857    let encoded_metrics = encoder
 858        .encode_to_string(&metric_families)
 859        .map_err(|err| anyhow!("{}", err))?;
 860    Ok(encoded_metrics)
 861}
 862
 863#[instrument(err, skip(executor))]
 864async fn connection_lost(
 865    session: Session,
 866    mut teardown: watch::Receiver<()>,
 867    executor: Executor,
 868) -> Result<()> {
 869    session.peer.disconnect(session.connection_id);
 870    session
 871        .connection_pool()
 872        .await
 873        .remove_connection(session.connection_id)?;
 874
 875    session
 876        .db()
 877        .await
 878        .connection_lost(session.connection_id)
 879        .await
 880        .trace_err();
 881
 882    futures::select_biased! {
 883        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 884            log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
 885            leave_room_for_session(&session).await.trace_err();
 886            leave_channel_buffers_for_session(&session)
 887                .await
 888                .trace_err();
 889
 890            if !session
 891                .connection_pool()
 892                .await
 893                .is_user_online(session.user_id)
 894            {
 895                let db = session.db().await;
 896                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 897                    room_updated(&room, &session.peer);
 898                }
 899            }
 900
 901            update_user_contacts(session.user_id, &session).await?;
 902        }
 903        _ = teardown.changed().fuse() => {}
 904    }
 905
 906    Ok(())
 907}
 908
 909async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 910    response.send(proto::Ack {})?;
 911    Ok(())
 912}
 913
 914async fn create_room(
 915    _request: proto::CreateRoom,
 916    response: Response<proto::CreateRoom>,
 917    session: Session,
 918) -> Result<()> {
 919    let live_kit_room = nanoid::nanoid!(30);
 920
 921    let live_kit_connection_info = {
 922        let live_kit_room = live_kit_room.clone();
 923        let live_kit = session.live_kit_client.as_ref();
 924
 925        util::async_iife!({
 926            let live_kit = live_kit?;
 927
 928            live_kit
 929                .create_room(live_kit_room.clone())
 930                .await
 931                .trace_err()?;
 932
 933            let token = live_kit
 934                .room_token(&live_kit_room, &session.user_id.to_string())
 935                .trace_err()?;
 936
 937            Some(proto::LiveKitConnectionInfo {
 938                server_url: live_kit.url().into(),
 939                token,
 940            })
 941        })
 942    }
 943    .await;
 944
 945    let room = session
 946        .db()
 947        .await
 948        .create_room(session.user_id, session.connection_id, &live_kit_room)
 949        .await?;
 950
 951    response.send(proto::CreateRoomResponse {
 952        room: Some(room.clone()),
 953        live_kit_connection_info,
 954    })?;
 955
 956    update_user_contacts(session.user_id, &session).await?;
 957    Ok(())
 958}
 959
 960async fn join_room(
 961    request: proto::JoinRoom,
 962    response: Response<proto::JoinRoom>,
 963    session: Session,
 964) -> Result<()> {
 965    let room_id = RoomId::from_proto(request.id);
 966    let joined_room = {
 967        let room = session
 968            .db()
 969            .await
 970            .join_room(room_id, session.user_id, session.connection_id)
 971            .await?;
 972        room_updated(&room.room, &session.peer);
 973        room.into_inner()
 974    };
 975
 976    if let Some(channel_id) = joined_room.channel_id {
 977        channel_updated(
 978            channel_id,
 979            &joined_room.room,
 980            &joined_room.channel_members,
 981            &session.peer,
 982            &*session.connection_pool().await,
 983        )
 984    }
 985
 986    for connection_id in session
 987        .connection_pool()
 988        .await
 989        .user_connection_ids(session.user_id)
 990    {
 991        session
 992            .peer
 993            .send(
 994                connection_id,
 995                proto::CallCanceled {
 996                    room_id: room_id.to_proto(),
 997                },
 998            )
 999            .trace_err();
1000    }
1001
1002    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1003        if let Some(token) = live_kit
1004            .room_token(
1005                &joined_room.room.live_kit_room,
1006                &session.user_id.to_string(),
1007            )
1008            .trace_err()
1009        {
1010            Some(proto::LiveKitConnectionInfo {
1011                server_url: live_kit.url().into(),
1012                token,
1013            })
1014        } else {
1015            None
1016        }
1017    } else {
1018        None
1019    };
1020
1021    response.send(proto::JoinRoomResponse {
1022        room: Some(joined_room.room),
1023        channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1024        live_kit_connection_info,
1025    })?;
1026
1027    update_user_contacts(session.user_id, &session).await?;
1028    Ok(())
1029}
1030
1031async fn rejoin_room(
1032    request: proto::RejoinRoom,
1033    response: Response<proto::RejoinRoom>,
1034    session: Session,
1035) -> Result<()> {
1036    let room;
1037    let channel_id;
1038    let channel_members;
1039    {
1040        let mut rejoined_room = session
1041            .db()
1042            .await
1043            .rejoin_room(request, session.user_id, session.connection_id)
1044            .await?;
1045
1046        response.send(proto::RejoinRoomResponse {
1047            room: Some(rejoined_room.room.clone()),
1048            reshared_projects: rejoined_room
1049                .reshared_projects
1050                .iter()
1051                .map(|project| proto::ResharedProject {
1052                    id: project.id.to_proto(),
1053                    collaborators: project
1054                        .collaborators
1055                        .iter()
1056                        .map(|collaborator| collaborator.to_proto())
1057                        .collect(),
1058                })
1059                .collect(),
1060            rejoined_projects: rejoined_room
1061                .rejoined_projects
1062                .iter()
1063                .map(|rejoined_project| proto::RejoinedProject {
1064                    id: rejoined_project.id.to_proto(),
1065                    worktrees: rejoined_project
1066                        .worktrees
1067                        .iter()
1068                        .map(|worktree| proto::WorktreeMetadata {
1069                            id: worktree.id,
1070                            root_name: worktree.root_name.clone(),
1071                            visible: worktree.visible,
1072                            abs_path: worktree.abs_path.clone(),
1073                        })
1074                        .collect(),
1075                    collaborators: rejoined_project
1076                        .collaborators
1077                        .iter()
1078                        .map(|collaborator| collaborator.to_proto())
1079                        .collect(),
1080                    language_servers: rejoined_project.language_servers.clone(),
1081                })
1082                .collect(),
1083        })?;
1084        room_updated(&rejoined_room.room, &session.peer);
1085
1086        for project in &rejoined_room.reshared_projects {
1087            for collaborator in &project.collaborators {
1088                session
1089                    .peer
1090                    .send(
1091                        collaborator.connection_id,
1092                        proto::UpdateProjectCollaborator {
1093                            project_id: project.id.to_proto(),
1094                            old_peer_id: Some(project.old_connection_id.into()),
1095                            new_peer_id: Some(session.connection_id.into()),
1096                        },
1097                    )
1098                    .trace_err();
1099            }
1100
1101            broadcast(
1102                Some(session.connection_id),
1103                project
1104                    .collaborators
1105                    .iter()
1106                    .map(|collaborator| collaborator.connection_id),
1107                |connection_id| {
1108                    session.peer.forward_send(
1109                        session.connection_id,
1110                        connection_id,
1111                        proto::UpdateProject {
1112                            project_id: project.id.to_proto(),
1113                            worktrees: project.worktrees.clone(),
1114                        },
1115                    )
1116                },
1117            );
1118        }
1119
1120        for project in &rejoined_room.rejoined_projects {
1121            for collaborator in &project.collaborators {
1122                session
1123                    .peer
1124                    .send(
1125                        collaborator.connection_id,
1126                        proto::UpdateProjectCollaborator {
1127                            project_id: project.id.to_proto(),
1128                            old_peer_id: Some(project.old_connection_id.into()),
1129                            new_peer_id: Some(session.connection_id.into()),
1130                        },
1131                    )
1132                    .trace_err();
1133            }
1134        }
1135
1136        for project in &mut rejoined_room.rejoined_projects {
1137            for worktree in mem::take(&mut project.worktrees) {
1138                #[cfg(any(test, feature = "test-support"))]
1139                const MAX_CHUNK_SIZE: usize = 2;
1140                #[cfg(not(any(test, feature = "test-support")))]
1141                const MAX_CHUNK_SIZE: usize = 256;
1142
1143                // Stream this worktree's entries.
1144                let message = proto::UpdateWorktree {
1145                    project_id: project.id.to_proto(),
1146                    worktree_id: worktree.id,
1147                    abs_path: worktree.abs_path.clone(),
1148                    root_name: worktree.root_name,
1149                    updated_entries: worktree.updated_entries,
1150                    removed_entries: worktree.removed_entries,
1151                    scan_id: worktree.scan_id,
1152                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1153                    updated_repositories: worktree.updated_repositories,
1154                    removed_repositories: worktree.removed_repositories,
1155                };
1156                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1157                    session.peer.send(session.connection_id, update.clone())?;
1158                }
1159
1160                // Stream this worktree's diagnostics.
1161                for summary in worktree.diagnostic_summaries {
1162                    session.peer.send(
1163                        session.connection_id,
1164                        proto::UpdateDiagnosticSummary {
1165                            project_id: project.id.to_proto(),
1166                            worktree_id: worktree.id,
1167                            summary: Some(summary),
1168                        },
1169                    )?;
1170                }
1171
1172                for settings_file in worktree.settings_files {
1173                    session.peer.send(
1174                        session.connection_id,
1175                        proto::UpdateWorktreeSettings {
1176                            project_id: project.id.to_proto(),
1177                            worktree_id: worktree.id,
1178                            path: settings_file.path,
1179                            content: Some(settings_file.content),
1180                        },
1181                    )?;
1182                }
1183            }
1184
1185            for language_server in &project.language_servers {
1186                session.peer.send(
1187                    session.connection_id,
1188                    proto::UpdateLanguageServer {
1189                        project_id: project.id.to_proto(),
1190                        language_server_id: language_server.id,
1191                        variant: Some(
1192                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1193                                proto::LspDiskBasedDiagnosticsUpdated {},
1194                            ),
1195                        ),
1196                    },
1197                )?;
1198            }
1199        }
1200
1201        let rejoined_room = rejoined_room.into_inner();
1202
1203        room = rejoined_room.room;
1204        channel_id = rejoined_room.channel_id;
1205        channel_members = rejoined_room.channel_members;
1206    }
1207
1208    if let Some(channel_id) = channel_id {
1209        channel_updated(
1210            channel_id,
1211            &room,
1212            &channel_members,
1213            &session.peer,
1214            &*session.connection_pool().await,
1215        );
1216    }
1217
1218    update_user_contacts(session.user_id, &session).await?;
1219    Ok(())
1220}
1221
1222async fn leave_room(
1223    _: proto::LeaveRoom,
1224    response: Response<proto::LeaveRoom>,
1225    session: Session,
1226) -> Result<()> {
1227    leave_room_for_session(&session).await?;
1228    response.send(proto::Ack {})?;
1229    Ok(())
1230}
1231
1232async fn call(
1233    request: proto::Call,
1234    response: Response<proto::Call>,
1235    session: Session,
1236) -> Result<()> {
1237    let room_id = RoomId::from_proto(request.room_id);
1238    let calling_user_id = session.user_id;
1239    let calling_connection_id = session.connection_id;
1240    let called_user_id = UserId::from_proto(request.called_user_id);
1241    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1242    if !session
1243        .db()
1244        .await
1245        .has_contact(calling_user_id, called_user_id)
1246        .await?
1247    {
1248        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1249    }
1250
1251    let incoming_call = {
1252        let (room, incoming_call) = &mut *session
1253            .db()
1254            .await
1255            .call(
1256                room_id,
1257                calling_user_id,
1258                calling_connection_id,
1259                called_user_id,
1260                initial_project_id,
1261            )
1262            .await?;
1263        room_updated(&room, &session.peer);
1264        mem::take(incoming_call)
1265    };
1266    update_user_contacts(called_user_id, &session).await?;
1267
1268    let mut calls = session
1269        .connection_pool()
1270        .await
1271        .user_connection_ids(called_user_id)
1272        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1273        .collect::<FuturesUnordered<_>>();
1274
1275    while let Some(call_response) = calls.next().await {
1276        match call_response.as_ref() {
1277            Ok(_) => {
1278                response.send(proto::Ack {})?;
1279                return Ok(());
1280            }
1281            Err(_) => {
1282                call_response.trace_err();
1283            }
1284        }
1285    }
1286
1287    {
1288        let room = session
1289            .db()
1290            .await
1291            .call_failed(room_id, called_user_id)
1292            .await?;
1293        room_updated(&room, &session.peer);
1294    }
1295    update_user_contacts(called_user_id, &session).await?;
1296
1297    Err(anyhow!("failed to ring user"))?
1298}
1299
1300async fn cancel_call(
1301    request: proto::CancelCall,
1302    response: Response<proto::CancelCall>,
1303    session: Session,
1304) -> Result<()> {
1305    let called_user_id = UserId::from_proto(request.called_user_id);
1306    let room_id = RoomId::from_proto(request.room_id);
1307    {
1308        let room = session
1309            .db()
1310            .await
1311            .cancel_call(room_id, session.connection_id, called_user_id)
1312            .await?;
1313        room_updated(&room, &session.peer);
1314    }
1315
1316    for connection_id in session
1317        .connection_pool()
1318        .await
1319        .user_connection_ids(called_user_id)
1320    {
1321        session
1322            .peer
1323            .send(
1324                connection_id,
1325                proto::CallCanceled {
1326                    room_id: room_id.to_proto(),
1327                },
1328            )
1329            .trace_err();
1330    }
1331    response.send(proto::Ack {})?;
1332
1333    update_user_contacts(called_user_id, &session).await?;
1334    Ok(())
1335}
1336
1337async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1338    let room_id = RoomId::from_proto(message.room_id);
1339    {
1340        let room = session
1341            .db()
1342            .await
1343            .decline_call(Some(room_id), session.user_id)
1344            .await?
1345            .ok_or_else(|| anyhow!("failed to decline call"))?;
1346        room_updated(&room, &session.peer);
1347    }
1348
1349    for connection_id in session
1350        .connection_pool()
1351        .await
1352        .user_connection_ids(session.user_id)
1353    {
1354        session
1355            .peer
1356            .send(
1357                connection_id,
1358                proto::CallCanceled {
1359                    room_id: room_id.to_proto(),
1360                },
1361            )
1362            .trace_err();
1363    }
1364    update_user_contacts(session.user_id, &session).await?;
1365    Ok(())
1366}
1367
1368async fn update_participant_location(
1369    request: proto::UpdateParticipantLocation,
1370    response: Response<proto::UpdateParticipantLocation>,
1371    session: Session,
1372) -> Result<()> {
1373    let room_id = RoomId::from_proto(request.room_id);
1374    let location = request
1375        .location
1376        .ok_or_else(|| anyhow!("invalid location"))?;
1377
1378    let db = session.db().await;
1379    let room = db
1380        .update_room_participant_location(room_id, session.connection_id, location)
1381        .await?;
1382
1383    room_updated(&room, &session.peer);
1384    response.send(proto::Ack {})?;
1385    Ok(())
1386}
1387
1388async fn share_project(
1389    request: proto::ShareProject,
1390    response: Response<proto::ShareProject>,
1391    session: Session,
1392) -> Result<()> {
1393    let (project_id, room) = &*session
1394        .db()
1395        .await
1396        .share_project(
1397            RoomId::from_proto(request.room_id),
1398            session.connection_id,
1399            &request.worktrees,
1400        )
1401        .await?;
1402    response.send(proto::ShareProjectResponse {
1403        project_id: project_id.to_proto(),
1404    })?;
1405    room_updated(&room, &session.peer);
1406
1407    Ok(())
1408}
1409
1410async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1411    let project_id = ProjectId::from_proto(message.project_id);
1412
1413    let (room, guest_connection_ids) = &*session
1414        .db()
1415        .await
1416        .unshare_project(project_id, session.connection_id)
1417        .await?;
1418
1419    broadcast(
1420        Some(session.connection_id),
1421        guest_connection_ids.iter().copied(),
1422        |conn_id| session.peer.send(conn_id, message.clone()),
1423    );
1424    room_updated(&room, &session.peer);
1425
1426    Ok(())
1427}
1428
1429async fn join_project(
1430    request: proto::JoinProject,
1431    response: Response<proto::JoinProject>,
1432    session: Session,
1433) -> Result<()> {
1434    let project_id = ProjectId::from_proto(request.project_id);
1435    let guest_user_id = session.user_id;
1436
1437    tracing::info!(%project_id, "join project");
1438
1439    let (project, replica_id) = &mut *session
1440        .db()
1441        .await
1442        .join_project(project_id, session.connection_id)
1443        .await?;
1444
1445    let collaborators = project
1446        .collaborators
1447        .iter()
1448        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1449        .map(|collaborator| collaborator.to_proto())
1450        .collect::<Vec<_>>();
1451
1452    let worktrees = project
1453        .worktrees
1454        .iter()
1455        .map(|(id, worktree)| proto::WorktreeMetadata {
1456            id: *id,
1457            root_name: worktree.root_name.clone(),
1458            visible: worktree.visible,
1459            abs_path: worktree.abs_path.clone(),
1460        })
1461        .collect::<Vec<_>>();
1462
1463    for collaborator in &collaborators {
1464        session
1465            .peer
1466            .send(
1467                collaborator.peer_id.unwrap().into(),
1468                proto::AddProjectCollaborator {
1469                    project_id: project_id.to_proto(),
1470                    collaborator: Some(proto::Collaborator {
1471                        peer_id: Some(session.connection_id.into()),
1472                        replica_id: replica_id.0 as u32,
1473                        user_id: guest_user_id.to_proto(),
1474                    }),
1475                },
1476            )
1477            .trace_err();
1478    }
1479
1480    // First, we send the metadata associated with each worktree.
1481    response.send(proto::JoinProjectResponse {
1482        worktrees: worktrees.clone(),
1483        replica_id: replica_id.0 as u32,
1484        collaborators: collaborators.clone(),
1485        language_servers: project.language_servers.clone(),
1486    })?;
1487
1488    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1489        #[cfg(any(test, feature = "test-support"))]
1490        const MAX_CHUNK_SIZE: usize = 2;
1491        #[cfg(not(any(test, feature = "test-support")))]
1492        const MAX_CHUNK_SIZE: usize = 256;
1493
1494        // Stream this worktree's entries.
1495        let message = proto::UpdateWorktree {
1496            project_id: project_id.to_proto(),
1497            worktree_id,
1498            abs_path: worktree.abs_path.clone(),
1499            root_name: worktree.root_name,
1500            updated_entries: worktree.entries,
1501            removed_entries: Default::default(),
1502            scan_id: worktree.scan_id,
1503            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1504            updated_repositories: worktree.repository_entries.into_values().collect(),
1505            removed_repositories: Default::default(),
1506        };
1507        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1508            session.peer.send(session.connection_id, update.clone())?;
1509        }
1510
1511        // Stream this worktree's diagnostics.
1512        for summary in worktree.diagnostic_summaries {
1513            session.peer.send(
1514                session.connection_id,
1515                proto::UpdateDiagnosticSummary {
1516                    project_id: project_id.to_proto(),
1517                    worktree_id: worktree.id,
1518                    summary: Some(summary),
1519                },
1520            )?;
1521        }
1522
1523        for settings_file in worktree.settings_files {
1524            session.peer.send(
1525                session.connection_id,
1526                proto::UpdateWorktreeSettings {
1527                    project_id: project_id.to_proto(),
1528                    worktree_id: worktree.id,
1529                    path: settings_file.path,
1530                    content: Some(settings_file.content),
1531                },
1532            )?;
1533        }
1534    }
1535
1536    for language_server in &project.language_servers {
1537        session.peer.send(
1538            session.connection_id,
1539            proto::UpdateLanguageServer {
1540                project_id: project_id.to_proto(),
1541                language_server_id: language_server.id,
1542                variant: Some(
1543                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1544                        proto::LspDiskBasedDiagnosticsUpdated {},
1545                    ),
1546                ),
1547            },
1548        )?;
1549    }
1550
1551    Ok(())
1552}
1553
1554async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1555    let sender_id = session.connection_id;
1556    let project_id = ProjectId::from_proto(request.project_id);
1557
1558    let (room, project) = &*session
1559        .db()
1560        .await
1561        .leave_project(project_id, sender_id)
1562        .await?;
1563    tracing::info!(
1564        %project_id,
1565        host_user_id = %project.host_user_id,
1566        host_connection_id = %project.host_connection_id,
1567        "leave project"
1568    );
1569
1570    project_left(&project, &session);
1571    room_updated(&room, &session.peer);
1572
1573    Ok(())
1574}
1575
1576async fn update_project(
1577    request: proto::UpdateProject,
1578    response: Response<proto::UpdateProject>,
1579    session: Session,
1580) -> Result<()> {
1581    let project_id = ProjectId::from_proto(request.project_id);
1582    let (room, guest_connection_ids) = &*session
1583        .db()
1584        .await
1585        .update_project(project_id, session.connection_id, &request.worktrees)
1586        .await?;
1587    broadcast(
1588        Some(session.connection_id),
1589        guest_connection_ids.iter().copied(),
1590        |connection_id| {
1591            session
1592                .peer
1593                .forward_send(session.connection_id, connection_id, request.clone())
1594        },
1595    );
1596    room_updated(&room, &session.peer);
1597    response.send(proto::Ack {})?;
1598
1599    Ok(())
1600}
1601
1602async fn update_worktree(
1603    request: proto::UpdateWorktree,
1604    response: Response<proto::UpdateWorktree>,
1605    session: Session,
1606) -> Result<()> {
1607    let guest_connection_ids = session
1608        .db()
1609        .await
1610        .update_worktree(&request, session.connection_id)
1611        .await?;
1612
1613    broadcast(
1614        Some(session.connection_id),
1615        guest_connection_ids.iter().copied(),
1616        |connection_id| {
1617            session
1618                .peer
1619                .forward_send(session.connection_id, connection_id, request.clone())
1620        },
1621    );
1622    response.send(proto::Ack {})?;
1623    Ok(())
1624}
1625
1626async fn update_diagnostic_summary(
1627    message: proto::UpdateDiagnosticSummary,
1628    session: Session,
1629) -> Result<()> {
1630    let guest_connection_ids = session
1631        .db()
1632        .await
1633        .update_diagnostic_summary(&message, session.connection_id)
1634        .await?;
1635
1636    broadcast(
1637        Some(session.connection_id),
1638        guest_connection_ids.iter().copied(),
1639        |connection_id| {
1640            session
1641                .peer
1642                .forward_send(session.connection_id, connection_id, message.clone())
1643        },
1644    );
1645
1646    Ok(())
1647}
1648
1649async fn update_worktree_settings(
1650    message: proto::UpdateWorktreeSettings,
1651    session: Session,
1652) -> Result<()> {
1653    let guest_connection_ids = session
1654        .db()
1655        .await
1656        .update_worktree_settings(&message, session.connection_id)
1657        .await?;
1658
1659    broadcast(
1660        Some(session.connection_id),
1661        guest_connection_ids.iter().copied(),
1662        |connection_id| {
1663            session
1664                .peer
1665                .forward_send(session.connection_id, connection_id, message.clone())
1666        },
1667    );
1668
1669    Ok(())
1670}
1671
1672async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1673    broadcast_project_message(request.project_id, request, session).await
1674}
1675
1676async fn start_language_server(
1677    request: proto::StartLanguageServer,
1678    session: Session,
1679) -> Result<()> {
1680    let guest_connection_ids = session
1681        .db()
1682        .await
1683        .start_language_server(&request, session.connection_id)
1684        .await?;
1685
1686    broadcast(
1687        Some(session.connection_id),
1688        guest_connection_ids.iter().copied(),
1689        |connection_id| {
1690            session
1691                .peer
1692                .forward_send(session.connection_id, connection_id, request.clone())
1693        },
1694    );
1695    Ok(())
1696}
1697
1698async fn update_language_server(
1699    request: proto::UpdateLanguageServer,
1700    session: Session,
1701) -> Result<()> {
1702    session.executor.record_backtrace();
1703    let project_id = ProjectId::from_proto(request.project_id);
1704    let project_connection_ids = session
1705        .db()
1706        .await
1707        .project_connection_ids(project_id, session.connection_id)
1708        .await?;
1709    broadcast(
1710        Some(session.connection_id),
1711        project_connection_ids.iter().copied(),
1712        |connection_id| {
1713            session
1714                .peer
1715                .forward_send(session.connection_id, connection_id, request.clone())
1716        },
1717    );
1718    Ok(())
1719}
1720
1721async fn forward_project_request<T>(
1722    request: T,
1723    response: Response<T>,
1724    session: Session,
1725) -> Result<()>
1726where
1727    T: EntityMessage + RequestMessage,
1728{
1729    session.executor.record_backtrace();
1730    let project_id = ProjectId::from_proto(request.remote_entity_id());
1731    let host_connection_id = {
1732        let collaborators = session
1733            .db()
1734            .await
1735            .project_collaborators(project_id, session.connection_id)
1736            .await?;
1737        collaborators
1738            .iter()
1739            .find(|collaborator| collaborator.is_host)
1740            .ok_or_else(|| anyhow!("host not found"))?
1741            .connection_id
1742    };
1743
1744    let payload = session
1745        .peer
1746        .forward_request(session.connection_id, host_connection_id, request)
1747        .await?;
1748
1749    response.send(payload)?;
1750    Ok(())
1751}
1752
1753async fn create_buffer_for_peer(
1754    request: proto::CreateBufferForPeer,
1755    session: Session,
1756) -> Result<()> {
1757    session.executor.record_backtrace();
1758    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1759    session
1760        .peer
1761        .forward_send(session.connection_id, peer_id.into(), request)?;
1762    Ok(())
1763}
1764
1765async fn update_buffer(
1766    request: proto::UpdateBuffer,
1767    response: Response<proto::UpdateBuffer>,
1768    session: Session,
1769) -> Result<()> {
1770    session.executor.record_backtrace();
1771    let project_id = ProjectId::from_proto(request.project_id);
1772    let mut guest_connection_ids;
1773    let mut host_connection_id = None;
1774    {
1775        let collaborators = session
1776            .db()
1777            .await
1778            .project_collaborators(project_id, session.connection_id)
1779            .await?;
1780        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1781        for collaborator in collaborators.iter() {
1782            if collaborator.is_host {
1783                host_connection_id = Some(collaborator.connection_id);
1784            } else {
1785                guest_connection_ids.push(collaborator.connection_id);
1786            }
1787        }
1788    }
1789    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1790
1791    session.executor.record_backtrace();
1792    broadcast(
1793        Some(session.connection_id),
1794        guest_connection_ids,
1795        |connection_id| {
1796            session
1797                .peer
1798                .forward_send(session.connection_id, connection_id, request.clone())
1799        },
1800    );
1801    if host_connection_id != session.connection_id {
1802        session
1803            .peer
1804            .forward_request(session.connection_id, host_connection_id, request.clone())
1805            .await?;
1806    }
1807
1808    response.send(proto::Ack {})?;
1809    Ok(())
1810}
1811
1812async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1813    let project_id = ProjectId::from_proto(request.project_id);
1814    let project_connection_ids = session
1815        .db()
1816        .await
1817        .project_connection_ids(project_id, session.connection_id)
1818        .await?;
1819
1820    broadcast(
1821        Some(session.connection_id),
1822        project_connection_ids.iter().copied(),
1823        |connection_id| {
1824            session
1825                .peer
1826                .forward_send(session.connection_id, connection_id, request.clone())
1827        },
1828    );
1829    Ok(())
1830}
1831
1832async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1833    let project_id = ProjectId::from_proto(request.project_id);
1834    let project_connection_ids = session
1835        .db()
1836        .await
1837        .project_connection_ids(project_id, session.connection_id)
1838        .await?;
1839    broadcast(
1840        Some(session.connection_id),
1841        project_connection_ids.iter().copied(),
1842        |connection_id| {
1843            session
1844                .peer
1845                .forward_send(session.connection_id, connection_id, request.clone())
1846        },
1847    );
1848    Ok(())
1849}
1850
1851async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1852    broadcast_project_message(request.project_id, request, session).await
1853}
1854
1855async fn broadcast_project_message<T: EnvelopedMessage>(
1856    project_id: u64,
1857    request: T,
1858    session: Session,
1859) -> Result<()> {
1860    let project_id = ProjectId::from_proto(project_id);
1861    let project_connection_ids = session
1862        .db()
1863        .await
1864        .project_connection_ids(project_id, session.connection_id)
1865        .await?;
1866    broadcast(
1867        Some(session.connection_id),
1868        project_connection_ids.iter().copied(),
1869        |connection_id| {
1870            session
1871                .peer
1872                .forward_send(session.connection_id, connection_id, request.clone())
1873        },
1874    );
1875    Ok(())
1876}
1877
1878async fn follow(
1879    request: proto::Follow,
1880    response: Response<proto::Follow>,
1881    session: Session,
1882) -> Result<()> {
1883    let project_id = ProjectId::from_proto(request.project_id);
1884    let leader_id = request
1885        .leader_id
1886        .ok_or_else(|| anyhow!("invalid leader id"))?
1887        .into();
1888    let follower_id = session.connection_id;
1889
1890    {
1891        let project_connection_ids = session
1892            .db()
1893            .await
1894            .project_connection_ids(project_id, session.connection_id)
1895            .await?;
1896
1897        if !project_connection_ids.contains(&leader_id) {
1898            Err(anyhow!("no such peer"))?;
1899        }
1900    }
1901
1902    let mut response_payload = session
1903        .peer
1904        .forward_request(session.connection_id, leader_id, request)
1905        .await?;
1906    response_payload
1907        .views
1908        .retain(|view| view.leader_id != Some(follower_id.into()));
1909    response.send(response_payload)?;
1910
1911    let room = session
1912        .db()
1913        .await
1914        .follow(project_id, leader_id, follower_id)
1915        .await?;
1916    room_updated(&room, &session.peer);
1917
1918    Ok(())
1919}
1920
1921async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1922    let project_id = ProjectId::from_proto(request.project_id);
1923    let leader_id = request
1924        .leader_id
1925        .ok_or_else(|| anyhow!("invalid leader id"))?
1926        .into();
1927    let follower_id = session.connection_id;
1928
1929    if !session
1930        .db()
1931        .await
1932        .project_connection_ids(project_id, session.connection_id)
1933        .await?
1934        .contains(&leader_id)
1935    {
1936        Err(anyhow!("no such peer"))?;
1937    }
1938
1939    session
1940        .peer
1941        .forward_send(session.connection_id, leader_id, request)?;
1942
1943    let room = session
1944        .db()
1945        .await
1946        .unfollow(project_id, leader_id, follower_id)
1947        .await?;
1948    room_updated(&room, &session.peer);
1949
1950    Ok(())
1951}
1952
1953async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1954    let project_id = ProjectId::from_proto(request.project_id);
1955    let project_connection_ids = session
1956        .db
1957        .lock()
1958        .await
1959        .project_connection_ids(project_id, session.connection_id)
1960        .await?;
1961
1962    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1963        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1964        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1965        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1966    });
1967    for follower_peer_id in request.follower_ids.iter().copied() {
1968        let follower_connection_id = follower_peer_id.into();
1969        if project_connection_ids.contains(&follower_connection_id)
1970            && Some(follower_peer_id) != leader_id
1971        {
1972            session.peer.forward_send(
1973                session.connection_id,
1974                follower_connection_id,
1975                request.clone(),
1976            )?;
1977        }
1978    }
1979    Ok(())
1980}
1981
1982async fn get_users(
1983    request: proto::GetUsers,
1984    response: Response<proto::GetUsers>,
1985    session: Session,
1986) -> Result<()> {
1987    let user_ids = request
1988        .user_ids
1989        .into_iter()
1990        .map(UserId::from_proto)
1991        .collect();
1992    let users = session
1993        .db()
1994        .await
1995        .get_users_by_ids(user_ids)
1996        .await?
1997        .into_iter()
1998        .map(|user| proto::User {
1999            id: user.id.to_proto(),
2000            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2001            github_login: user.github_login,
2002        })
2003        .collect();
2004    response.send(proto::UsersResponse { users })?;
2005    Ok(())
2006}
2007
2008async fn fuzzy_search_users(
2009    request: proto::FuzzySearchUsers,
2010    response: Response<proto::FuzzySearchUsers>,
2011    session: Session,
2012) -> Result<()> {
2013    let query = request.query;
2014    let users = match query.len() {
2015        0 => vec![],
2016        1 | 2 => session
2017            .db()
2018            .await
2019            .get_user_by_github_login(&query)
2020            .await?
2021            .into_iter()
2022            .collect(),
2023        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2024    };
2025    let users = users
2026        .into_iter()
2027        .filter(|user| user.id != session.user_id)
2028        .map(|user| proto::User {
2029            id: user.id.to_proto(),
2030            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2031            github_login: user.github_login,
2032        })
2033        .collect();
2034    response.send(proto::UsersResponse { users })?;
2035    Ok(())
2036}
2037
2038async fn request_contact(
2039    request: proto::RequestContact,
2040    response: Response<proto::RequestContact>,
2041    session: Session,
2042) -> Result<()> {
2043    let requester_id = session.user_id;
2044    let responder_id = UserId::from_proto(request.responder_id);
2045    if requester_id == responder_id {
2046        return Err(anyhow!("cannot add yourself as a contact"))?;
2047    }
2048
2049    session
2050        .db()
2051        .await
2052        .send_contact_request(requester_id, responder_id)
2053        .await?;
2054
2055    // Update outgoing contact requests of requester
2056    let mut update = proto::UpdateContacts::default();
2057    update.outgoing_requests.push(responder_id.to_proto());
2058    for connection_id in session
2059        .connection_pool()
2060        .await
2061        .user_connection_ids(requester_id)
2062    {
2063        session.peer.send(connection_id, update.clone())?;
2064    }
2065
2066    // Update incoming contact requests of responder
2067    let mut update = proto::UpdateContacts::default();
2068    update
2069        .incoming_requests
2070        .push(proto::IncomingContactRequest {
2071            requester_id: requester_id.to_proto(),
2072            should_notify: true,
2073        });
2074    for connection_id in session
2075        .connection_pool()
2076        .await
2077        .user_connection_ids(responder_id)
2078    {
2079        session.peer.send(connection_id, update.clone())?;
2080    }
2081
2082    response.send(proto::Ack {})?;
2083    Ok(())
2084}
2085
2086async fn respond_to_contact_request(
2087    request: proto::RespondToContactRequest,
2088    response: Response<proto::RespondToContactRequest>,
2089    session: Session,
2090) -> Result<()> {
2091    let responder_id = session.user_id;
2092    let requester_id = UserId::from_proto(request.requester_id);
2093    let db = session.db().await;
2094    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2095        db.dismiss_contact_notification(responder_id, requester_id)
2096            .await?;
2097    } else {
2098        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2099
2100        db.respond_to_contact_request(responder_id, requester_id, accept)
2101            .await?;
2102        let requester_busy = db.is_user_busy(requester_id).await?;
2103        let responder_busy = db.is_user_busy(responder_id).await?;
2104
2105        let pool = session.connection_pool().await;
2106        // Update responder with new contact
2107        let mut update = proto::UpdateContacts::default();
2108        if accept {
2109            update
2110                .contacts
2111                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2112        }
2113        update
2114            .remove_incoming_requests
2115            .push(requester_id.to_proto());
2116        for connection_id in pool.user_connection_ids(responder_id) {
2117            session.peer.send(connection_id, update.clone())?;
2118        }
2119
2120        // Update requester with new contact
2121        let mut update = proto::UpdateContacts::default();
2122        if accept {
2123            update
2124                .contacts
2125                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2126        }
2127        update
2128            .remove_outgoing_requests
2129            .push(responder_id.to_proto());
2130        for connection_id in pool.user_connection_ids(requester_id) {
2131            session.peer.send(connection_id, update.clone())?;
2132        }
2133    }
2134
2135    response.send(proto::Ack {})?;
2136    Ok(())
2137}
2138
2139async fn remove_contact(
2140    request: proto::RemoveContact,
2141    response: Response<proto::RemoveContact>,
2142    session: Session,
2143) -> Result<()> {
2144    let requester_id = session.user_id;
2145    let responder_id = UserId::from_proto(request.user_id);
2146    let db = session.db().await;
2147    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2148
2149    let pool = session.connection_pool().await;
2150    // Update outgoing contact requests of requester
2151    let mut update = proto::UpdateContacts::default();
2152    if contact_accepted {
2153        update.remove_contacts.push(responder_id.to_proto());
2154    } else {
2155        update
2156            .remove_outgoing_requests
2157            .push(responder_id.to_proto());
2158    }
2159    for connection_id in pool.user_connection_ids(requester_id) {
2160        session.peer.send(connection_id, update.clone())?;
2161    }
2162
2163    // Update incoming contact requests of responder
2164    let mut update = proto::UpdateContacts::default();
2165    if contact_accepted {
2166        update.remove_contacts.push(requester_id.to_proto());
2167    } else {
2168        update
2169            .remove_incoming_requests
2170            .push(requester_id.to_proto());
2171    }
2172    for connection_id in pool.user_connection_ids(responder_id) {
2173        session.peer.send(connection_id, update.clone())?;
2174    }
2175
2176    response.send(proto::Ack {})?;
2177    Ok(())
2178}
2179
2180async fn create_channel(
2181    request: proto::CreateChannel,
2182    response: Response<proto::CreateChannel>,
2183    session: Session,
2184) -> Result<()> {
2185    let db = session.db().await;
2186    let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2187
2188    if let Some(live_kit) = session.live_kit_client.as_ref() {
2189        live_kit.create_room(live_kit_room.clone()).await?;
2190    }
2191
2192    let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2193    let id = db
2194        .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2195        .await?;
2196
2197    let channel = proto::Channel {
2198        id: id.to_proto(),
2199        name: request.name,
2200        parent_id: request.parent_id,
2201    };
2202
2203    response.send(proto::ChannelResponse {
2204        channel: Some(channel.clone()),
2205    })?;
2206
2207    let mut update = proto::UpdateChannels::default();
2208    update.channels.push(channel);
2209
2210    let user_ids_to_notify = if let Some(parent_id) = parent_id {
2211        db.get_channel_members(parent_id).await?
2212    } else {
2213        vec![session.user_id]
2214    };
2215
2216    let connection_pool = session.connection_pool().await;
2217    for user_id in user_ids_to_notify {
2218        for connection_id in connection_pool.user_connection_ids(user_id) {
2219            let mut update = update.clone();
2220            if user_id == session.user_id {
2221                update.channel_permissions.push(proto::ChannelPermission {
2222                    channel_id: id.to_proto(),
2223                    is_admin: true,
2224                });
2225            }
2226            session.peer.send(connection_id, update)?;
2227        }
2228    }
2229
2230    Ok(())
2231}
2232
2233async fn remove_channel(
2234    request: proto::RemoveChannel,
2235    response: Response<proto::RemoveChannel>,
2236    session: Session,
2237) -> Result<()> {
2238    let db = session.db().await;
2239
2240    let channel_id = request.channel_id;
2241    let (removed_channels, member_ids) = db
2242        .remove_channel(ChannelId::from_proto(channel_id), session.user_id)
2243        .await?;
2244    response.send(proto::Ack {})?;
2245
2246    // Notify members of removed channels
2247    let mut update = proto::UpdateChannels::default();
2248    update
2249        .remove_channels
2250        .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2251
2252    let connection_pool = session.connection_pool().await;
2253    for member_id in member_ids {
2254        for connection_id in connection_pool.user_connection_ids(member_id) {
2255            session.peer.send(connection_id, update.clone())?;
2256        }
2257    }
2258
2259    Ok(())
2260}
2261
2262async fn invite_channel_member(
2263    request: proto::InviteChannelMember,
2264    response: Response<proto::InviteChannelMember>,
2265    session: Session,
2266) -> Result<()> {
2267    let db = session.db().await;
2268    let channel_id = ChannelId::from_proto(request.channel_id);
2269    let invitee_id = UserId::from_proto(request.user_id);
2270    db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2271        .await?;
2272
2273    let (channel, _) = db
2274        .get_channel(channel_id, session.user_id)
2275        .await?
2276        .ok_or_else(|| anyhow!("channel not found"))?;
2277
2278    let mut update = proto::UpdateChannels::default();
2279    update.channel_invitations.push(proto::Channel {
2280        id: channel.id.to_proto(),
2281        name: channel.name,
2282        parent_id: None,
2283    });
2284    for connection_id in session
2285        .connection_pool()
2286        .await
2287        .user_connection_ids(invitee_id)
2288    {
2289        session.peer.send(connection_id, update.clone())?;
2290    }
2291
2292    response.send(proto::Ack {})?;
2293    Ok(())
2294}
2295
2296async fn remove_channel_member(
2297    request: proto::RemoveChannelMember,
2298    response: Response<proto::RemoveChannelMember>,
2299    session: Session,
2300) -> Result<()> {
2301    let db = session.db().await;
2302    let channel_id = ChannelId::from_proto(request.channel_id);
2303    let member_id = UserId::from_proto(request.user_id);
2304
2305    db.remove_channel_member(channel_id, member_id, session.user_id)
2306        .await?;
2307
2308    let mut update = proto::UpdateChannels::default();
2309    update.remove_channels.push(channel_id.to_proto());
2310
2311    for connection_id in session
2312        .connection_pool()
2313        .await
2314        .user_connection_ids(member_id)
2315    {
2316        session.peer.send(connection_id, update.clone())?;
2317    }
2318
2319    response.send(proto::Ack {})?;
2320    Ok(())
2321}
2322
2323async fn set_channel_member_admin(
2324    request: proto::SetChannelMemberAdmin,
2325    response: Response<proto::SetChannelMemberAdmin>,
2326    session: Session,
2327) -> Result<()> {
2328    let db = session.db().await;
2329    let channel_id = ChannelId::from_proto(request.channel_id);
2330    let member_id = UserId::from_proto(request.user_id);
2331    db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2332        .await?;
2333
2334    let (channel, has_accepted) = db
2335        .get_channel(channel_id, member_id)
2336        .await?
2337        .ok_or_else(|| anyhow!("channel not found"))?;
2338
2339    let mut update = proto::UpdateChannels::default();
2340    if has_accepted {
2341        update.channel_permissions.push(proto::ChannelPermission {
2342            channel_id: channel.id.to_proto(),
2343            is_admin: request.admin,
2344        });
2345    }
2346
2347    for connection_id in session
2348        .connection_pool()
2349        .await
2350        .user_connection_ids(member_id)
2351    {
2352        session.peer.send(connection_id, update.clone())?;
2353    }
2354
2355    response.send(proto::Ack {})?;
2356    Ok(())
2357}
2358
2359async fn rename_channel(
2360    request: proto::RenameChannel,
2361    response: Response<proto::RenameChannel>,
2362    session: Session,
2363) -> Result<()> {
2364    let db = session.db().await;
2365    let channel_id = ChannelId::from_proto(request.channel_id);
2366    let new_name = db
2367        .rename_channel(channel_id, session.user_id, &request.name)
2368        .await?;
2369
2370    let channel = proto::Channel {
2371        id: request.channel_id,
2372        name: new_name,
2373        parent_id: None,
2374    };
2375    response.send(proto::ChannelResponse {
2376        channel: Some(channel.clone()),
2377    })?;
2378    let mut update = proto::UpdateChannels::default();
2379    update.channels.push(channel);
2380
2381    let member_ids = db.get_channel_members(channel_id).await?;
2382
2383    let connection_pool = session.connection_pool().await;
2384    for member_id in member_ids {
2385        for connection_id in connection_pool.user_connection_ids(member_id) {
2386            session.peer.send(connection_id, update.clone())?;
2387        }
2388    }
2389
2390    Ok(())
2391}
2392
2393async fn get_channel_members(
2394    request: proto::GetChannelMembers,
2395    response: Response<proto::GetChannelMembers>,
2396    session: Session,
2397) -> Result<()> {
2398    let db = session.db().await;
2399    let channel_id = ChannelId::from_proto(request.channel_id);
2400    let members = db
2401        .get_channel_member_details(channel_id, session.user_id)
2402        .await?;
2403    response.send(proto::GetChannelMembersResponse { members })?;
2404    Ok(())
2405}
2406
2407async fn respond_to_channel_invite(
2408    request: proto::RespondToChannelInvite,
2409    response: Response<proto::RespondToChannelInvite>,
2410    session: Session,
2411) -> Result<()> {
2412    let db = session.db().await;
2413    let channel_id = ChannelId::from_proto(request.channel_id);
2414    db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2415        .await?;
2416
2417    let mut update = proto::UpdateChannels::default();
2418    update
2419        .remove_channel_invitations
2420        .push(channel_id.to_proto());
2421    if request.accept {
2422        let result = db.get_channels_for_user(session.user_id).await?;
2423        update
2424            .channels
2425            .extend(result.channels.into_iter().map(|channel| proto::Channel {
2426                id: channel.id.to_proto(),
2427                name: channel.name,
2428                parent_id: channel.parent_id.map(ChannelId::to_proto),
2429            }));
2430        update
2431            .channel_participants
2432            .extend(
2433                result
2434                    .channel_participants
2435                    .into_iter()
2436                    .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2437                        channel_id: channel_id.to_proto(),
2438                        participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2439                    }),
2440            );
2441        update
2442            .channel_permissions
2443            .extend(
2444                result
2445                    .channels_with_admin_privileges
2446                    .into_iter()
2447                    .map(|channel_id| proto::ChannelPermission {
2448                        channel_id: channel_id.to_proto(),
2449                        is_admin: true,
2450                    }),
2451            );
2452    }
2453    session.peer.send(session.connection_id, update)?;
2454    response.send(proto::Ack {})?;
2455
2456    Ok(())
2457}
2458
2459async fn join_channel(
2460    request: proto::JoinChannel,
2461    response: Response<proto::JoinChannel>,
2462    session: Session,
2463) -> Result<()> {
2464    let channel_id = ChannelId::from_proto(request.channel_id);
2465
2466    let joined_room = {
2467        leave_room_for_session(&session).await?;
2468        let db = session.db().await;
2469
2470        let room_id = db.room_id_for_channel(channel_id).await?;
2471
2472        let joined_room = db
2473            .join_room(room_id, session.user_id, session.connection_id)
2474            .await?;
2475
2476        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2477            let token = live_kit
2478                .room_token(
2479                    &joined_room.room.live_kit_room,
2480                    &session.user_id.to_string(),
2481                )
2482                .trace_err()?;
2483
2484            Some(LiveKitConnectionInfo {
2485                server_url: live_kit.url().into(),
2486                token,
2487            })
2488        });
2489
2490        response.send(proto::JoinRoomResponse {
2491            room: Some(joined_room.room.clone()),
2492            channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2493            live_kit_connection_info,
2494        })?;
2495
2496        room_updated(&joined_room.room, &session.peer);
2497
2498        joined_room.into_inner()
2499    };
2500
2501    channel_updated(
2502        channel_id,
2503        &joined_room.room,
2504        &joined_room.channel_members,
2505        &session.peer,
2506        &*session.connection_pool().await,
2507    );
2508
2509    update_user_contacts(session.user_id, &session).await?;
2510
2511    Ok(())
2512}
2513
2514async fn join_channel_buffer(
2515    request: proto::JoinChannelBuffer,
2516    response: Response<proto::JoinChannelBuffer>,
2517    session: Session,
2518) -> Result<()> {
2519    let db = session.db().await;
2520    let channel_id = ChannelId::from_proto(request.channel_id);
2521
2522    let open_response = db
2523        .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2524        .await?;
2525
2526    let replica_id = open_response.replica_id;
2527    let collaborators = open_response.collaborators.clone();
2528
2529    response.send(open_response)?;
2530
2531    let update = AddChannelBufferCollaborator {
2532        channel_id: channel_id.to_proto(),
2533        collaborator: Some(proto::Collaborator {
2534            user_id: session.user_id.to_proto(),
2535            peer_id: Some(session.connection_id.into()),
2536            replica_id,
2537        }),
2538    };
2539    channel_buffer_updated(
2540        session.connection_id,
2541        collaborators
2542            .iter()
2543            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2544        &update,
2545        &session.peer,
2546    );
2547
2548    Ok(())
2549}
2550
2551async fn update_channel_buffer(
2552    request: proto::UpdateChannelBuffer,
2553    session: Session,
2554) -> Result<()> {
2555    let db = session.db().await;
2556    let channel_id = ChannelId::from_proto(request.channel_id);
2557
2558    let collaborators = db
2559        .update_channel_buffer(channel_id, session.user_id, &request.operations)
2560        .await?;
2561
2562    channel_buffer_updated(
2563        session.connection_id,
2564        collaborators,
2565        &proto::UpdateChannelBuffer {
2566            channel_id: channel_id.to_proto(),
2567            operations: request.operations,
2568        },
2569        &session.peer,
2570    );
2571    Ok(())
2572}
2573
2574async fn rejoin_channel_buffers(
2575    request: proto::RejoinChannelBuffers,
2576    response: Response<proto::RejoinChannelBuffers>,
2577    session: Session,
2578) -> Result<()> {
2579    let db = session.db().await;
2580    let buffers = db
2581        .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2582        .await?;
2583
2584    for buffer in &buffers {
2585        let collaborators_to_notify = buffer
2586            .buffer
2587            .collaborators
2588            .iter()
2589            .filter_map(|c| Some(c.peer_id?.into()));
2590        channel_buffer_updated(
2591            session.connection_id,
2592            collaborators_to_notify,
2593            &proto::UpdateChannelBufferCollaborator {
2594                channel_id: buffer.buffer.channel_id,
2595                old_peer_id: Some(buffer.old_connection_id.into()),
2596                new_peer_id: Some(session.connection_id.into()),
2597            },
2598            &session.peer,
2599        );
2600    }
2601
2602    response.send(proto::RejoinChannelBuffersResponse {
2603        buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2604    })?;
2605
2606    Ok(())
2607}
2608
2609async fn leave_channel_buffer(
2610    request: proto::LeaveChannelBuffer,
2611    response: Response<proto::LeaveChannelBuffer>,
2612    session: Session,
2613) -> Result<()> {
2614    let db = session.db().await;
2615    let channel_id = ChannelId::from_proto(request.channel_id);
2616
2617    let collaborators_to_notify = db
2618        .leave_channel_buffer(channel_id, session.connection_id)
2619        .await?;
2620
2621    response.send(Ack {})?;
2622
2623    channel_buffer_updated(
2624        session.connection_id,
2625        collaborators_to_notify,
2626        &proto::RemoveChannelBufferCollaborator {
2627            channel_id: channel_id.to_proto(),
2628            peer_id: Some(session.connection_id.into()),
2629        },
2630        &session.peer,
2631    );
2632
2633    Ok(())
2634}
2635
2636fn channel_buffer_updated<T: EnvelopedMessage>(
2637    sender_id: ConnectionId,
2638    collaborators: impl IntoIterator<Item = ConnectionId>,
2639    message: &T,
2640    peer: &Peer,
2641) {
2642    broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2643        peer.send(peer_id.into(), message.clone())
2644    });
2645}
2646
2647async fn send_channel_message(
2648    request: proto::SendChannelMessage,
2649    response: Response<proto::SendChannelMessage>,
2650    session: Session,
2651) -> Result<()> {
2652    // Validate the message body.
2653    let body = request.body.trim().to_string();
2654    if body.len() > MAX_MESSAGE_LEN {
2655        return Err(anyhow!("message is too long"))?;
2656    }
2657    if body.is_empty() {
2658        return Err(anyhow!("message can't be blank"))?;
2659    }
2660
2661    let timestamp = OffsetDateTime::now_utc();
2662    let nonce = request
2663        .nonce
2664        .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2665
2666    let channel_id = ChannelId::from_proto(request.channel_id);
2667    let (message_id, connection_ids) = session
2668        .db()
2669        .await
2670        .create_channel_message(
2671            channel_id,
2672            session.user_id,
2673            &body,
2674            timestamp,
2675            nonce.clone().into(),
2676        )
2677        .await?;
2678    let message = proto::ChannelMessage {
2679        sender_id: session.user_id.to_proto(),
2680        id: message_id.to_proto(),
2681        body,
2682        timestamp: timestamp.unix_timestamp() as u64,
2683        nonce: Some(nonce),
2684    };
2685    broadcast(Some(session.connection_id), connection_ids, |connection| {
2686        session.peer.send(
2687            connection,
2688            proto::ChannelMessageSent {
2689                channel_id: channel_id.to_proto(),
2690                message: Some(message.clone()),
2691            },
2692        )
2693    });
2694    response.send(proto::SendChannelMessageResponse {
2695        message: Some(message),
2696    })?;
2697    Ok(())
2698}
2699
2700async fn remove_channel_message(
2701    request: proto::RemoveChannelMessage,
2702    response: Response<proto::RemoveChannelMessage>,
2703    session: Session,
2704) -> Result<()> {
2705    let channel_id = ChannelId::from_proto(request.channel_id);
2706    let message_id = MessageId::from_proto(request.message_id);
2707    let connection_ids = session
2708        .db()
2709        .await
2710        .remove_channel_message(channel_id, message_id, session.user_id)
2711        .await?;
2712    broadcast(Some(session.connection_id), connection_ids, |connection| {
2713        session.peer.send(connection, request.clone())
2714    });
2715    response.send(proto::Ack {})?;
2716    Ok(())
2717}
2718
2719async fn join_channel_chat(
2720    request: proto::JoinChannelChat,
2721    response: Response<proto::JoinChannelChat>,
2722    session: Session,
2723) -> Result<()> {
2724    let channel_id = ChannelId::from_proto(request.channel_id);
2725
2726    let db = session.db().await;
2727    db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2728        .await?;
2729    let messages = db
2730        .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2731        .await?;
2732    response.send(proto::JoinChannelChatResponse {
2733        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2734        messages,
2735    })?;
2736    Ok(())
2737}
2738
2739async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2740    let channel_id = ChannelId::from_proto(request.channel_id);
2741    session
2742        .db()
2743        .await
2744        .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2745        .await?;
2746    Ok(())
2747}
2748
2749async fn get_channel_messages(
2750    request: proto::GetChannelMessages,
2751    response: Response<proto::GetChannelMessages>,
2752    session: Session,
2753) -> Result<()> {
2754    let channel_id = ChannelId::from_proto(request.channel_id);
2755    let messages = session
2756        .db()
2757        .await
2758        .get_channel_messages(
2759            channel_id,
2760            session.user_id,
2761            MESSAGE_COUNT_PER_PAGE,
2762            Some(MessageId::from_proto(request.before_message_id)),
2763        )
2764        .await?;
2765    response.send(proto::GetChannelMessagesResponse {
2766        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2767        messages,
2768    })?;
2769    Ok(())
2770}
2771
2772async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2773    let project_id = ProjectId::from_proto(request.project_id);
2774    let project_connection_ids = session
2775        .db()
2776        .await
2777        .project_connection_ids(project_id, session.connection_id)
2778        .await?;
2779    broadcast(
2780        Some(session.connection_id),
2781        project_connection_ids.iter().copied(),
2782        |connection_id| {
2783            session
2784                .peer
2785                .forward_send(session.connection_id, connection_id, request.clone())
2786        },
2787    );
2788    Ok(())
2789}
2790
2791async fn get_private_user_info(
2792    _request: proto::GetPrivateUserInfo,
2793    response: Response<proto::GetPrivateUserInfo>,
2794    session: Session,
2795) -> Result<()> {
2796    let db = session.db().await;
2797
2798    let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2799    let user = db
2800        .get_user_by_id(session.user_id)
2801        .await?
2802        .ok_or_else(|| anyhow!("user not found"))?;
2803    let flags = db.get_user_flags(session.user_id).await?;
2804
2805    response.send(proto::GetPrivateUserInfoResponse {
2806        metrics_id,
2807        staff: user.admin,
2808        flags,
2809    })?;
2810    Ok(())
2811}
2812
2813fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2814    match message {
2815        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2816        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2817        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2818        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2819        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2820            code: frame.code.into(),
2821            reason: frame.reason,
2822        })),
2823    }
2824}
2825
2826fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2827    match message {
2828        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2829        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2830        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2831        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2832        AxumMessage::Close(frame) => {
2833            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2834                code: frame.code.into(),
2835                reason: frame.reason,
2836            }))
2837        }
2838    }
2839}
2840
2841fn build_initial_channels_update(
2842    channels: ChannelsForUser,
2843    channel_invites: Vec<db::Channel>,
2844) -> proto::UpdateChannels {
2845    let mut update = proto::UpdateChannels::default();
2846
2847    for channel in channels.channels {
2848        update.channels.push(proto::Channel {
2849            id: channel.id.to_proto(),
2850            name: channel.name,
2851            parent_id: channel.parent_id.map(|id| id.to_proto()),
2852        });
2853    }
2854
2855    for (channel_id, participants) in channels.channel_participants {
2856        update
2857            .channel_participants
2858            .push(proto::ChannelParticipants {
2859                channel_id: channel_id.to_proto(),
2860                participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2861            });
2862    }
2863
2864    update
2865        .channel_permissions
2866        .extend(
2867            channels
2868                .channels_with_admin_privileges
2869                .into_iter()
2870                .map(|id| proto::ChannelPermission {
2871                    channel_id: id.to_proto(),
2872                    is_admin: true,
2873                }),
2874        );
2875
2876    for channel in channel_invites {
2877        update.channel_invitations.push(proto::Channel {
2878            id: channel.id.to_proto(),
2879            name: channel.name,
2880            parent_id: None,
2881        });
2882    }
2883
2884    update
2885}
2886
2887fn build_initial_contacts_update(
2888    contacts: Vec<db::Contact>,
2889    pool: &ConnectionPool,
2890) -> proto::UpdateContacts {
2891    let mut update = proto::UpdateContacts::default();
2892
2893    for contact in contacts {
2894        match contact {
2895            db::Contact::Accepted {
2896                user_id,
2897                should_notify,
2898                busy,
2899            } => {
2900                update
2901                    .contacts
2902                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2903            }
2904            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2905            db::Contact::Incoming {
2906                user_id,
2907                should_notify,
2908            } => update
2909                .incoming_requests
2910                .push(proto::IncomingContactRequest {
2911                    requester_id: user_id.to_proto(),
2912                    should_notify,
2913                }),
2914        }
2915    }
2916
2917    update
2918}
2919
2920fn contact_for_user(
2921    user_id: UserId,
2922    should_notify: bool,
2923    busy: bool,
2924    pool: &ConnectionPool,
2925) -> proto::Contact {
2926    proto::Contact {
2927        user_id: user_id.to_proto(),
2928        online: pool.is_user_online(user_id),
2929        busy,
2930        should_notify,
2931    }
2932}
2933
2934fn room_updated(room: &proto::Room, peer: &Peer) {
2935    broadcast(
2936        None,
2937        room.participants
2938            .iter()
2939            .filter_map(|participant| Some(participant.peer_id?.into())),
2940        |peer_id| {
2941            peer.send(
2942                peer_id.into(),
2943                proto::RoomUpdated {
2944                    room: Some(room.clone()),
2945                },
2946            )
2947        },
2948    );
2949}
2950
2951fn channel_updated(
2952    channel_id: ChannelId,
2953    room: &proto::Room,
2954    channel_members: &[UserId],
2955    peer: &Peer,
2956    pool: &ConnectionPool,
2957) {
2958    let participants = room
2959        .participants
2960        .iter()
2961        .map(|p| p.user_id)
2962        .collect::<Vec<_>>();
2963
2964    broadcast(
2965        None,
2966        channel_members
2967            .iter()
2968            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2969        |peer_id| {
2970            peer.send(
2971                peer_id.into(),
2972                proto::UpdateChannels {
2973                    channel_participants: vec![proto::ChannelParticipants {
2974                        channel_id: channel_id.to_proto(),
2975                        participant_user_ids: participants.clone(),
2976                    }],
2977                    ..Default::default()
2978                },
2979            )
2980        },
2981    );
2982}
2983
2984async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
2985    let db = session.db().await;
2986
2987    let contacts = db.get_contacts(user_id).await?;
2988    let busy = db.is_user_busy(user_id).await?;
2989
2990    let pool = session.connection_pool().await;
2991    let updated_contact = contact_for_user(user_id, false, busy, &pool);
2992    for contact in contacts {
2993        if let db::Contact::Accepted {
2994            user_id: contact_user_id,
2995            ..
2996        } = contact
2997        {
2998            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
2999                session
3000                    .peer
3001                    .send(
3002                        contact_conn_id,
3003                        proto::UpdateContacts {
3004                            contacts: vec![updated_contact.clone()],
3005                            remove_contacts: Default::default(),
3006                            incoming_requests: Default::default(),
3007                            remove_incoming_requests: Default::default(),
3008                            outgoing_requests: Default::default(),
3009                            remove_outgoing_requests: Default::default(),
3010                        },
3011                    )
3012                    .trace_err();
3013            }
3014        }
3015    }
3016    Ok(())
3017}
3018
3019async fn leave_room_for_session(session: &Session) -> Result<()> {
3020    let mut contacts_to_update = HashSet::default();
3021
3022    let room_id;
3023    let canceled_calls_to_user_ids;
3024    let live_kit_room;
3025    let delete_live_kit_room;
3026    let room;
3027    let channel_members;
3028    let channel_id;
3029
3030    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3031        contacts_to_update.insert(session.user_id);
3032
3033        for project in left_room.left_projects.values() {
3034            project_left(project, session);
3035        }
3036
3037        room_id = RoomId::from_proto(left_room.room.id);
3038        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3039        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3040        delete_live_kit_room = left_room.deleted;
3041        room = mem::take(&mut left_room.room);
3042        channel_members = mem::take(&mut left_room.channel_members);
3043        channel_id = left_room.channel_id;
3044
3045        room_updated(&room, &session.peer);
3046    } else {
3047        return Ok(());
3048    }
3049
3050    if let Some(channel_id) = channel_id {
3051        channel_updated(
3052            channel_id,
3053            &room,
3054            &channel_members,
3055            &session.peer,
3056            &*session.connection_pool().await,
3057        );
3058    }
3059
3060    {
3061        let pool = session.connection_pool().await;
3062        for canceled_user_id in canceled_calls_to_user_ids {
3063            for connection_id in pool.user_connection_ids(canceled_user_id) {
3064                session
3065                    .peer
3066                    .send(
3067                        connection_id,
3068                        proto::CallCanceled {
3069                            room_id: room_id.to_proto(),
3070                        },
3071                    )
3072                    .trace_err();
3073            }
3074            contacts_to_update.insert(canceled_user_id);
3075        }
3076    }
3077
3078    for contact_user_id in contacts_to_update {
3079        update_user_contacts(contact_user_id, &session).await?;
3080    }
3081
3082    if let Some(live_kit) = session.live_kit_client.as_ref() {
3083        live_kit
3084            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3085            .await
3086            .trace_err();
3087
3088        if delete_live_kit_room {
3089            live_kit.delete_room(live_kit_room).await.trace_err();
3090        }
3091    }
3092
3093    Ok(())
3094}
3095
3096async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3097    let left_channel_buffers = session
3098        .db()
3099        .await
3100        .leave_channel_buffers(session.connection_id)
3101        .await?;
3102
3103    for (channel_id, connections) in left_channel_buffers {
3104        channel_buffer_updated(
3105            session.connection_id,
3106            connections,
3107            &proto::RemoveChannelBufferCollaborator {
3108                channel_id: channel_id.to_proto(),
3109                peer_id: Some(session.connection_id.into()),
3110            },
3111            &session.peer,
3112        );
3113    }
3114
3115    Ok(())
3116}
3117
3118fn project_left(project: &db::LeftProject, session: &Session) {
3119    for connection_id in &project.connection_ids {
3120        if project.host_user_id == session.user_id {
3121            session
3122                .peer
3123                .send(
3124                    *connection_id,
3125                    proto::UnshareProject {
3126                        project_id: project.id.to_proto(),
3127                    },
3128                )
3129                .trace_err();
3130        } else {
3131            session
3132                .peer
3133                .send(
3134                    *connection_id,
3135                    proto::RemoveProjectCollaborator {
3136                        project_id: project.id.to_proto(),
3137                        peer_id: Some(session.connection_id.into()),
3138                    },
3139                )
3140                .trace_err();
3141        }
3142    }
3143}
3144
3145pub trait ResultExt {
3146    type Ok;
3147
3148    fn trace_err(self) -> Option<Self::Ok>;
3149}
3150
3151impl<T, E> ResultExt for Result<T, E>
3152where
3153    E: std::fmt::Debug,
3154{
3155    type Ok = T;
3156
3157    fn trace_err(self) -> Option<T> {
3158        match self {
3159            Ok(value) => Some(value),
3160            Err(error) => {
3161                tracing::error!("{:?}", error);
3162                None
3163            }
3164        }
3165    }
3166}