rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{
   6        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
   7        UserId,
   8    },
   9    executor::Executor,
  10    AppState, Result,
  11};
  12use anyhow::anyhow;
  13use async_tungstenite::tungstenite::{
  14    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  15};
  16use axum::{
  17    body::Body,
  18    extract::{
  19        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  20        ConnectInfo, WebSocketUpgrade,
  21    },
  22    headers::{Header, HeaderName},
  23    http::StatusCode,
  24    middleware,
  25    response::IntoResponse,
  26    routing::get,
  27    Extension, Router, TypedHeader,
  28};
  29use collections::{HashMap, HashSet};
  30pub use connection_pool::ConnectionPool;
  31use futures::{
  32    channel::oneshot,
  33    future::{self, BoxFuture},
  34    stream::FuturesUnordered,
  35    FutureExt, SinkExt, StreamExt, TryStreamExt,
  36};
  37use lazy_static::lazy_static;
  38use prometheus::{register_int_gauge, IntGauge};
  39use rpc::{
  40    proto::{
  41        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
  42        LiveKitConnectionInfo, RequestMessage,
  43    },
  44    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  45};
  46use serde::{Serialize, Serializer};
  47use std::{
  48    any::TypeId,
  49    fmt,
  50    future::Future,
  51    marker::PhantomData,
  52    mem,
  53    net::SocketAddr,
  54    ops::{Deref, DerefMut},
  55    rc::Rc,
  56    sync::{
  57        atomic::{AtomicBool, Ordering::SeqCst},
  58        Arc,
  59    },
  60    time::{Duration, Instant},
  61};
  62use time::OffsetDateTime;
  63use tokio::sync::{watch, Semaphore};
  64use tower::ServiceBuilder;
  65use tracing::{info_span, instrument, Instrument};
  66
  67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  69
  70const MESSAGE_COUNT_PER_PAGE: usize = 100;
  71const MAX_MESSAGE_LEN: usize = 1024;
  72
  73lazy_static! {
  74    static ref METRIC_CONNECTIONS: IntGauge =
  75        register_int_gauge!("connections", "number of connections").unwrap();
  76    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  77        "shared_projects",
  78        "number of open projects with one or more guests"
  79    )
  80    .unwrap();
  81}
  82
  83type MessageHandler =
  84    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  85
  86struct Response<R> {
  87    peer: Arc<Peer>,
  88    receipt: Receipt<R>,
  89    responded: Arc<AtomicBool>,
  90}
  91
  92impl<R: RequestMessage> Response<R> {
  93    fn send(self, payload: R::Response) -> Result<()> {
  94        self.responded.store(true, SeqCst);
  95        self.peer.respond(self.receipt, payload)?;
  96        Ok(())
  97    }
  98}
  99
 100#[derive(Clone)]
 101struct Session {
 102    user_id: UserId,
 103    connection_id: ConnectionId,
 104    db: Arc<tokio::sync::Mutex<DbHandle>>,
 105    peer: Arc<Peer>,
 106    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 107    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 108    executor: Executor,
 109}
 110
 111impl Session {
 112    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 113        #[cfg(test)]
 114        tokio::task::yield_now().await;
 115        let guard = self.db.lock().await;
 116        #[cfg(test)]
 117        tokio::task::yield_now().await;
 118        guard
 119    }
 120
 121    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 122        #[cfg(test)]
 123        tokio::task::yield_now().await;
 124        let guard = self.connection_pool.lock();
 125        ConnectionPoolGuard {
 126            guard,
 127            _not_send: PhantomData,
 128        }
 129    }
 130}
 131
 132impl fmt::Debug for Session {
 133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 134        f.debug_struct("Session")
 135            .field("user_id", &self.user_id)
 136            .field("connection_id", &self.connection_id)
 137            .finish()
 138    }
 139}
 140
 141struct DbHandle(Arc<Database>);
 142
 143impl Deref for DbHandle {
 144    type Target = Database;
 145
 146    fn deref(&self) -> &Self::Target {
 147        self.0.as_ref()
 148    }
 149}
 150
 151pub struct Server {
 152    id: parking_lot::Mutex<ServerId>,
 153    peer: Arc<Peer>,
 154    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 155    app_state: Arc<AppState>,
 156    executor: Executor,
 157    handlers: HashMap<TypeId, MessageHandler>,
 158    teardown: watch::Sender<()>,
 159}
 160
 161pub(crate) struct ConnectionPoolGuard<'a> {
 162    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 163    _not_send: PhantomData<Rc<()>>,
 164}
 165
 166#[derive(Serialize)]
 167pub struct ServerSnapshot<'a> {
 168    peer: &'a Peer,
 169    #[serde(serialize_with = "serialize_deref")]
 170    connection_pool: ConnectionPoolGuard<'a>,
 171}
 172
 173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 174where
 175    S: Serializer,
 176    T: Deref<Target = U>,
 177    U: Serialize,
 178{
 179    Serialize::serialize(value.deref(), serializer)
 180}
 181
 182impl Server {
 183    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 184        let mut server = Self {
 185            id: parking_lot::Mutex::new(id),
 186            peer: Peer::new(id.0 as u32),
 187            app_state,
 188            executor,
 189            connection_pool: Default::default(),
 190            handlers: Default::default(),
 191            teardown: watch::channel(()).0,
 192        };
 193
 194        server
 195            .add_request_handler(ping)
 196            .add_request_handler(create_room)
 197            .add_request_handler(join_room)
 198            .add_request_handler(rejoin_room)
 199            .add_request_handler(leave_room)
 200            .add_request_handler(call)
 201            .add_request_handler(cancel_call)
 202            .add_message_handler(decline_call)
 203            .add_request_handler(update_participant_location)
 204            .add_request_handler(share_project)
 205            .add_message_handler(unshare_project)
 206            .add_request_handler(join_project)
 207            .add_message_handler(leave_project)
 208            .add_request_handler(update_project)
 209            .add_request_handler(update_worktree)
 210            .add_message_handler(start_language_server)
 211            .add_message_handler(update_language_server)
 212            .add_message_handler(update_diagnostic_summary)
 213            .add_message_handler(update_worktree_settings)
 214            .add_message_handler(refresh_inlay_hints)
 215            .add_request_handler(forward_project_request::<proto::GetHover>)
 216            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 217            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 218            .add_request_handler(forward_project_request::<proto::GetReferences>)
 219            .add_request_handler(forward_project_request::<proto::SearchProject>)
 220            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 221            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 222            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 223            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 224            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 225            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 226            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 227            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 228            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 229            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 230            .add_request_handler(forward_project_request::<proto::PerformRename>)
 231            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 232            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 233            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 234            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 235            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 236            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 237            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 238            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 239            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 240            .add_request_handler(forward_project_request::<proto::InlayHints>)
 241            .add_message_handler(create_buffer_for_peer)
 242            .add_request_handler(update_buffer)
 243            .add_message_handler(update_buffer_file)
 244            .add_message_handler(buffer_reloaded)
 245            .add_message_handler(buffer_saved)
 246            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 247            .add_request_handler(get_users)
 248            .add_request_handler(fuzzy_search_users)
 249            .add_request_handler(request_contact)
 250            .add_request_handler(remove_contact)
 251            .add_request_handler(respond_to_contact_request)
 252            .add_request_handler(create_channel)
 253            .add_request_handler(delete_channel)
 254            .add_request_handler(invite_channel_member)
 255            .add_request_handler(remove_channel_member)
 256            .add_request_handler(set_channel_member_admin)
 257            .add_request_handler(rename_channel)
 258            .add_request_handler(join_channel_buffer)
 259            .add_request_handler(leave_channel_buffer)
 260            .add_message_handler(update_channel_buffer)
 261            .add_request_handler(rejoin_channel_buffers)
 262            .add_request_handler(get_channel_members)
 263            .add_request_handler(respond_to_channel_invite)
 264            .add_request_handler(join_channel)
 265            .add_request_handler(join_channel_chat)
 266            .add_message_handler(leave_channel_chat)
 267            .add_request_handler(send_channel_message)
 268            .add_request_handler(remove_channel_message)
 269            .add_request_handler(get_channel_messages)
 270            .add_request_handler(move_channel)
 271            .add_request_handler(follow)
 272            .add_message_handler(unfollow)
 273            .add_message_handler(update_followers)
 274            .add_message_handler(update_diff_base)
 275            .add_request_handler(get_private_user_info);
 276
 277        Arc::new(server)
 278    }
 279
 280    pub async fn start(&self) -> Result<()> {
 281        let server_id = *self.id.lock();
 282        let app_state = self.app_state.clone();
 283        let peer = self.peer.clone();
 284        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 285        let pool = self.connection_pool.clone();
 286        let live_kit_client = self.app_state.live_kit_client.clone();
 287
 288        let span = info_span!("start server");
 289        self.executor.spawn_detached(
 290            async move {
 291                tracing::info!("waiting for cleanup timeout");
 292                timeout.await;
 293                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 294                if let Some((room_ids, channel_ids)) = app_state
 295                    .db
 296                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
 297                    .await
 298                    .trace_err()
 299                {
 300                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 301                    tracing::info!(
 302                        stale_channel_buffer_count = channel_ids.len(),
 303                        "retrieved stale channel buffers"
 304                    );
 305
 306                    for channel_id in channel_ids {
 307                        if let Some(refreshed_channel_buffer) = app_state
 308                            .db
 309                            .clear_stale_channel_buffer_collaborators(channel_id, server_id)
 310                            .await
 311                            .trace_err()
 312                        {
 313                            for connection_id in refreshed_channel_buffer.connection_ids {
 314                                for message in &refreshed_channel_buffer.removed_collaborators {
 315                                    peer.send(connection_id, message.clone()).trace_err();
 316                                }
 317                            }
 318                        }
 319                    }
 320
 321                    for room_id in room_ids {
 322                        let mut contacts_to_update = HashSet::default();
 323                        let mut canceled_calls_to_user_ids = Vec::new();
 324                        let mut live_kit_room = String::new();
 325                        let mut delete_live_kit_room = false;
 326
 327                        if let Some(mut refreshed_room) = app_state
 328                            .db
 329                            .clear_stale_room_participants(room_id, server_id)
 330                            .await
 331                            .trace_err()
 332                        {
 333                            tracing::info!(
 334                                room_id = room_id.0,
 335                                new_participant_count = refreshed_room.room.participants.len(),
 336                                "refreshed room"
 337                            );
 338                            room_updated(&refreshed_room.room, &peer);
 339                            if let Some(channel_id) = refreshed_room.channel_id {
 340                                channel_updated(
 341                                    channel_id,
 342                                    &refreshed_room.room,
 343                                    &refreshed_room.channel_members,
 344                                    &peer,
 345                                    &*pool.lock(),
 346                                );
 347                            }
 348                            contacts_to_update
 349                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 350                            contacts_to_update
 351                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 352                            canceled_calls_to_user_ids =
 353                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 354                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 355                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 356                        }
 357
 358                        {
 359                            let pool = pool.lock();
 360                            for canceled_user_id in canceled_calls_to_user_ids {
 361                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 362                                    peer.send(
 363                                        connection_id,
 364                                        proto::CallCanceled {
 365                                            room_id: room_id.to_proto(),
 366                                        },
 367                                    )
 368                                    .trace_err();
 369                                }
 370                            }
 371                        }
 372
 373                        for user_id in contacts_to_update {
 374                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 375                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 376                            if let Some((busy, contacts)) = busy.zip(contacts) {
 377                                let pool = pool.lock();
 378                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 379                                for contact in contacts {
 380                                    if let db::Contact::Accepted {
 381                                        user_id: contact_user_id,
 382                                        ..
 383                                    } = contact
 384                                    {
 385                                        for contact_conn_id in
 386                                            pool.user_connection_ids(contact_user_id)
 387                                        {
 388                                            peer.send(
 389                                                contact_conn_id,
 390                                                proto::UpdateContacts {
 391                                                    contacts: vec![updated_contact.clone()],
 392                                                    remove_contacts: Default::default(),
 393                                                    incoming_requests: Default::default(),
 394                                                    remove_incoming_requests: Default::default(),
 395                                                    outgoing_requests: Default::default(),
 396                                                    remove_outgoing_requests: Default::default(),
 397                                                },
 398                                            )
 399                                            .trace_err();
 400                                        }
 401                                    }
 402                                }
 403                            }
 404                        }
 405
 406                        if let Some(live_kit) = live_kit_client.as_ref() {
 407                            if delete_live_kit_room {
 408                                live_kit.delete_room(live_kit_room).await.trace_err();
 409                            }
 410                        }
 411                    }
 412                }
 413
 414                app_state
 415                    .db
 416                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 417                    .await
 418                    .trace_err();
 419            }
 420            .instrument(span),
 421        );
 422        Ok(())
 423    }
 424
 425    pub fn teardown(&self) {
 426        self.peer.teardown();
 427        self.connection_pool.lock().reset();
 428        let _ = self.teardown.send(());
 429    }
 430
 431    #[cfg(test)]
 432    pub fn reset(&self, id: ServerId) {
 433        self.teardown();
 434        *self.id.lock() = id;
 435        self.peer.reset(id.0 as u32);
 436    }
 437
 438    #[cfg(test)]
 439    pub fn id(&self) -> ServerId {
 440        *self.id.lock()
 441    }
 442
 443    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 444    where
 445        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 446        Fut: 'static + Send + Future<Output = Result<()>>,
 447        M: EnvelopedMessage,
 448    {
 449        let prev_handler = self.handlers.insert(
 450            TypeId::of::<M>(),
 451            Box::new(move |envelope, session| {
 452                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 453                let span = info_span!(
 454                    "handle message",
 455                    payload_type = envelope.payload_type_name()
 456                );
 457                span.in_scope(|| {
 458                    tracing::info!(
 459                        payload_type = envelope.payload_type_name(),
 460                        "message received"
 461                    );
 462                });
 463                let start_time = Instant::now();
 464                let future = (handler)(*envelope, session);
 465                async move {
 466                    let result = future.await;
 467                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 468                    match result {
 469                        Err(error) => {
 470                            tracing::error!(%error, ?duration_ms, "error handling message")
 471                        }
 472                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 473                    }
 474                }
 475                .instrument(span)
 476                .boxed()
 477            }),
 478        );
 479        if prev_handler.is_some() {
 480            panic!("registered a handler for the same message twice");
 481        }
 482        self
 483    }
 484
 485    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 486    where
 487        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 488        Fut: 'static + Send + Future<Output = Result<()>>,
 489        M: EnvelopedMessage,
 490    {
 491        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 492        self
 493    }
 494
 495    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 496    where
 497        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 498        Fut: Send + Future<Output = Result<()>>,
 499        M: RequestMessage,
 500    {
 501        let handler = Arc::new(handler);
 502        self.add_handler(move |envelope, session| {
 503            let receipt = envelope.receipt();
 504            let handler = handler.clone();
 505            async move {
 506                let peer = session.peer.clone();
 507                let responded = Arc::new(AtomicBool::default());
 508                let response = Response {
 509                    peer: peer.clone(),
 510                    responded: responded.clone(),
 511                    receipt,
 512                };
 513                match (handler)(envelope.payload, response, session).await {
 514                    Ok(()) => {
 515                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 516                            Ok(())
 517                        } else {
 518                            Err(anyhow!("handler did not send a response"))?
 519                        }
 520                    }
 521                    Err(error) => {
 522                        peer.respond_with_error(
 523                            receipt,
 524                            proto::Error {
 525                                message: error.to_string(),
 526                            },
 527                        )?;
 528                        Err(error)
 529                    }
 530                }
 531            }
 532        })
 533    }
 534
 535    pub fn handle_connection(
 536        self: &Arc<Self>,
 537        connection: Connection,
 538        address: String,
 539        user: User,
 540        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 541        executor: Executor,
 542    ) -> impl Future<Output = Result<()>> {
 543        let this = self.clone();
 544        let user_id = user.id;
 545        let login = user.github_login;
 546        let span = info_span!("handle connection", %user_id, %login, %address);
 547        let mut teardown = self.teardown.subscribe();
 548        async move {
 549            let (connection_id, handle_io, mut incoming_rx) = this
 550                .peer
 551                .add_connection(connection, {
 552                    let executor = executor.clone();
 553                    move |duration| executor.sleep(duration)
 554                });
 555
 556            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 557            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 558            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 559
 560            if let Some(send_connection_id) = send_connection_id.take() {
 561                let _ = send_connection_id.send(connection_id);
 562            }
 563
 564            if !user.connected_once {
 565                this.peer.send(connection_id, proto::ShowContacts {})?;
 566                this.app_state.db.set_user_connected_once(user_id, true).await?;
 567            }
 568
 569            let (contacts, channels_for_user, channel_invites) = future::try_join3(
 570                this.app_state.db.get_contacts(user_id),
 571                this.app_state.db.get_channels_for_user(user_id),
 572                this.app_state.db.get_channel_invites_for_user(user_id)
 573            ).await?;
 574
 575            {
 576                let mut pool = this.connection_pool.lock();
 577                pool.add_connection(connection_id, user_id, user.admin);
 578                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 579                this.peer.send(connection_id, build_initial_channels_update(
 580                    channels_for_user,
 581                    channel_invites
 582                ))?;
 583            }
 584
 585            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 586                this.peer.send(connection_id, incoming_call)?;
 587            }
 588
 589            let session = Session {
 590                user_id,
 591                connection_id,
 592                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 593                peer: this.peer.clone(),
 594                connection_pool: this.connection_pool.clone(),
 595                live_kit_client: this.app_state.live_kit_client.clone(),
 596                executor: executor.clone(),
 597            };
 598            update_user_contacts(user_id, &session).await?;
 599
 600            let handle_io = handle_io.fuse();
 601            futures::pin_mut!(handle_io);
 602
 603            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 604            // This prevents deadlocks when e.g., client A performs a request to client B and
 605            // client B performs a request to client A. If both clients stop processing further
 606            // messages until their respective request completes, they won't have a chance to
 607            // respond to the other client's request and cause a deadlock.
 608            //
 609            // This arrangement ensures we will attempt to process earlier messages first, but fall
 610            // back to processing messages arrived later in the spirit of making progress.
 611            let mut foreground_message_handlers = FuturesUnordered::new();
 612            let concurrent_handlers = Arc::new(Semaphore::new(256));
 613            loop {
 614                let next_message = async {
 615                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 616                    let message = incoming_rx.next().await;
 617                    (permit, message)
 618                }.fuse();
 619                futures::pin_mut!(next_message);
 620                futures::select_biased! {
 621                    _ = teardown.changed().fuse() => return Ok(()),
 622                    result = handle_io => {
 623                        if let Err(error) = result {
 624                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 625                        }
 626                        break;
 627                    }
 628                    _ = foreground_message_handlers.next() => {}
 629                    next_message = next_message => {
 630                        let (permit, message) = next_message;
 631                        if let Some(message) = message {
 632                            let type_name = message.payload_type_name();
 633                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 634                            let span_enter = span.enter();
 635                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 636                                let is_background = message.is_background();
 637                                let handle_message = (handler)(message, session.clone());
 638                                drop(span_enter);
 639
 640                                let handle_message = async move {
 641                                    handle_message.await;
 642                                    drop(permit);
 643                                }.instrument(span);
 644                                if is_background {
 645                                    executor.spawn_detached(handle_message);
 646                                } else {
 647                                    foreground_message_handlers.push(handle_message);
 648                                }
 649                            } else {
 650                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 651                            }
 652                        } else {
 653                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 654                            break;
 655                        }
 656                    }
 657                }
 658            }
 659
 660            drop(foreground_message_handlers);
 661            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 662            if let Err(error) = connection_lost(session, teardown, executor).await {
 663                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 664            }
 665
 666            Ok(())
 667        }.instrument(span)
 668    }
 669
 670    pub async fn invite_code_redeemed(
 671        self: &Arc<Self>,
 672        inviter_id: UserId,
 673        invitee_id: UserId,
 674    ) -> Result<()> {
 675        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 676            if let Some(code) = &user.invite_code {
 677                let pool = self.connection_pool.lock();
 678                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 679                for connection_id in pool.user_connection_ids(inviter_id) {
 680                    self.peer.send(
 681                        connection_id,
 682                        proto::UpdateContacts {
 683                            contacts: vec![invitee_contact.clone()],
 684                            ..Default::default()
 685                        },
 686                    )?;
 687                    self.peer.send(
 688                        connection_id,
 689                        proto::UpdateInviteInfo {
 690                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 691                            count: user.invite_count as u32,
 692                        },
 693                    )?;
 694                }
 695            }
 696        }
 697        Ok(())
 698    }
 699
 700    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 701        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 702            if let Some(invite_code) = &user.invite_code {
 703                let pool = self.connection_pool.lock();
 704                for connection_id in pool.user_connection_ids(user_id) {
 705                    self.peer.send(
 706                        connection_id,
 707                        proto::UpdateInviteInfo {
 708                            url: format!(
 709                                "{}{}",
 710                                self.app_state.config.invite_link_prefix, invite_code
 711                            ),
 712                            count: user.invite_count as u32,
 713                        },
 714                    )?;
 715                }
 716            }
 717        }
 718        Ok(())
 719    }
 720
 721    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 722        ServerSnapshot {
 723            connection_pool: ConnectionPoolGuard {
 724                guard: self.connection_pool.lock(),
 725                _not_send: PhantomData,
 726            },
 727            peer: &self.peer,
 728        }
 729    }
 730}
 731
 732impl<'a> Deref for ConnectionPoolGuard<'a> {
 733    type Target = ConnectionPool;
 734
 735    fn deref(&self) -> &Self::Target {
 736        &*self.guard
 737    }
 738}
 739
 740impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 741    fn deref_mut(&mut self) -> &mut Self::Target {
 742        &mut *self.guard
 743    }
 744}
 745
 746impl<'a> Drop for ConnectionPoolGuard<'a> {
 747    fn drop(&mut self) {
 748        #[cfg(test)]
 749        self.check_invariants();
 750    }
 751}
 752
 753fn broadcast<F>(
 754    sender_id: Option<ConnectionId>,
 755    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 756    mut f: F,
 757) where
 758    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 759{
 760    for receiver_id in receiver_ids {
 761        if Some(receiver_id) != sender_id {
 762            if let Err(error) = f(receiver_id) {
 763                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 764            }
 765        }
 766    }
 767}
 768
 769lazy_static! {
 770    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 771}
 772
 773pub struct ProtocolVersion(u32);
 774
 775impl Header for ProtocolVersion {
 776    fn name() -> &'static HeaderName {
 777        &ZED_PROTOCOL_VERSION
 778    }
 779
 780    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 781    where
 782        Self: Sized,
 783        I: Iterator<Item = &'i axum::http::HeaderValue>,
 784    {
 785        let version = values
 786            .next()
 787            .ok_or_else(axum::headers::Error::invalid)?
 788            .to_str()
 789            .map_err(|_| axum::headers::Error::invalid())?
 790            .parse()
 791            .map_err(|_| axum::headers::Error::invalid())?;
 792        Ok(Self(version))
 793    }
 794
 795    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 796        values.extend([self.0.to_string().parse().unwrap()]);
 797    }
 798}
 799
 800pub fn routes(server: Arc<Server>) -> Router<Body> {
 801    Router::new()
 802        .route("/rpc", get(handle_websocket_request))
 803        .layer(
 804            ServiceBuilder::new()
 805                .layer(Extension(server.app_state.clone()))
 806                .layer(middleware::from_fn(auth::validate_header)),
 807        )
 808        .route("/metrics", get(handle_metrics))
 809        .layer(Extension(server))
 810}
 811
 812pub async fn handle_websocket_request(
 813    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 814    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 815    Extension(server): Extension<Arc<Server>>,
 816    Extension(user): Extension<User>,
 817    ws: WebSocketUpgrade,
 818) -> axum::response::Response {
 819    if protocol_version != rpc::PROTOCOL_VERSION {
 820        return (
 821            StatusCode::UPGRADE_REQUIRED,
 822            "client must be upgraded".to_string(),
 823        )
 824            .into_response();
 825    }
 826    let socket_address = socket_address.to_string();
 827    ws.on_upgrade(move |socket| {
 828        use util::ResultExt;
 829        let socket = socket
 830            .map_ok(to_tungstenite_message)
 831            .err_into()
 832            .with(|message| async move { Ok(to_axum_message(message)) });
 833        let connection = Connection::new(Box::pin(socket));
 834        async move {
 835            server
 836                .handle_connection(connection, socket_address, user, None, Executor::Production)
 837                .await
 838                .log_err();
 839        }
 840    })
 841}
 842
 843pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 844    let connections = server
 845        .connection_pool
 846        .lock()
 847        .connections()
 848        .filter(|connection| !connection.admin)
 849        .count();
 850
 851    METRIC_CONNECTIONS.set(connections as _);
 852
 853    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 854    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 855
 856    let encoder = prometheus::TextEncoder::new();
 857    let metric_families = prometheus::gather();
 858    let encoded_metrics = encoder
 859        .encode_to_string(&metric_families)
 860        .map_err(|err| anyhow!("{}", err))?;
 861    Ok(encoded_metrics)
 862}
 863
 864#[instrument(err, skip(executor))]
 865async fn connection_lost(
 866    session: Session,
 867    mut teardown: watch::Receiver<()>,
 868    executor: Executor,
 869) -> Result<()> {
 870    session.peer.disconnect(session.connection_id);
 871    session
 872        .connection_pool()
 873        .await
 874        .remove_connection(session.connection_id)?;
 875
 876    session
 877        .db()
 878        .await
 879        .connection_lost(session.connection_id)
 880        .await
 881        .trace_err();
 882
 883    futures::select_biased! {
 884        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 885            log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
 886            leave_room_for_session(&session).await.trace_err();
 887            leave_channel_buffers_for_session(&session)
 888                .await
 889                .trace_err();
 890
 891            if !session
 892                .connection_pool()
 893                .await
 894                .is_user_online(session.user_id)
 895            {
 896                let db = session.db().await;
 897                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 898                    room_updated(&room, &session.peer);
 899                }
 900            }
 901
 902            update_user_contacts(session.user_id, &session).await?;
 903        }
 904        _ = teardown.changed().fuse() => {}
 905    }
 906
 907    Ok(())
 908}
 909
 910async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 911    response.send(proto::Ack {})?;
 912    Ok(())
 913}
 914
 915async fn create_room(
 916    _request: proto::CreateRoom,
 917    response: Response<proto::CreateRoom>,
 918    session: Session,
 919) -> Result<()> {
 920    let live_kit_room = nanoid::nanoid!(30);
 921
 922    let live_kit_connection_info = {
 923        let live_kit_room = live_kit_room.clone();
 924        let live_kit = session.live_kit_client.as_ref();
 925
 926        util::async_iife!({
 927            let live_kit = live_kit?;
 928
 929            live_kit
 930                .create_room(live_kit_room.clone())
 931                .await
 932                .trace_err()?;
 933
 934            let token = live_kit
 935                .room_token(&live_kit_room, &session.user_id.to_string())
 936                .trace_err()?;
 937
 938            Some(proto::LiveKitConnectionInfo {
 939                server_url: live_kit.url().into(),
 940                token,
 941            })
 942        })
 943    }
 944    .await;
 945
 946    let room = session
 947        .db()
 948        .await
 949        .create_room(session.user_id, session.connection_id, &live_kit_room)
 950        .await?;
 951
 952    response.send(proto::CreateRoomResponse {
 953        room: Some(room.clone()),
 954        live_kit_connection_info,
 955    })?;
 956
 957    update_user_contacts(session.user_id, &session).await?;
 958    Ok(())
 959}
 960
 961async fn join_room(
 962    request: proto::JoinRoom,
 963    response: Response<proto::JoinRoom>,
 964    session: Session,
 965) -> Result<()> {
 966    let room_id = RoomId::from_proto(request.id);
 967    let joined_room = {
 968        let room = session
 969            .db()
 970            .await
 971            .join_room(room_id, session.user_id, session.connection_id)
 972            .await?;
 973        room_updated(&room.room, &session.peer);
 974        room.into_inner()
 975    };
 976
 977    if let Some(channel_id) = joined_room.channel_id {
 978        channel_updated(
 979            channel_id,
 980            &joined_room.room,
 981            &joined_room.channel_members,
 982            &session.peer,
 983            &*session.connection_pool().await,
 984        )
 985    }
 986
 987    for connection_id in session
 988        .connection_pool()
 989        .await
 990        .user_connection_ids(session.user_id)
 991    {
 992        session
 993            .peer
 994            .send(
 995                connection_id,
 996                proto::CallCanceled {
 997                    room_id: room_id.to_proto(),
 998                },
 999            )
1000            .trace_err();
1001    }
1002
1003    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1004        if let Some(token) = live_kit
1005            .room_token(
1006                &joined_room.room.live_kit_room,
1007                &session.user_id.to_string(),
1008            )
1009            .trace_err()
1010        {
1011            Some(proto::LiveKitConnectionInfo {
1012                server_url: live_kit.url().into(),
1013                token,
1014            })
1015        } else {
1016            None
1017        }
1018    } else {
1019        None
1020    };
1021
1022    response.send(proto::JoinRoomResponse {
1023        room: Some(joined_room.room),
1024        channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1025        live_kit_connection_info,
1026    })?;
1027
1028    update_user_contacts(session.user_id, &session).await?;
1029    Ok(())
1030}
1031
1032async fn rejoin_room(
1033    request: proto::RejoinRoom,
1034    response: Response<proto::RejoinRoom>,
1035    session: Session,
1036) -> Result<()> {
1037    let room;
1038    let channel_id;
1039    let channel_members;
1040    {
1041        let mut rejoined_room = session
1042            .db()
1043            .await
1044            .rejoin_room(request, session.user_id, session.connection_id)
1045            .await?;
1046
1047        response.send(proto::RejoinRoomResponse {
1048            room: Some(rejoined_room.room.clone()),
1049            reshared_projects: rejoined_room
1050                .reshared_projects
1051                .iter()
1052                .map(|project| proto::ResharedProject {
1053                    id: project.id.to_proto(),
1054                    collaborators: project
1055                        .collaborators
1056                        .iter()
1057                        .map(|collaborator| collaborator.to_proto())
1058                        .collect(),
1059                })
1060                .collect(),
1061            rejoined_projects: rejoined_room
1062                .rejoined_projects
1063                .iter()
1064                .map(|rejoined_project| proto::RejoinedProject {
1065                    id: rejoined_project.id.to_proto(),
1066                    worktrees: rejoined_project
1067                        .worktrees
1068                        .iter()
1069                        .map(|worktree| proto::WorktreeMetadata {
1070                            id: worktree.id,
1071                            root_name: worktree.root_name.clone(),
1072                            visible: worktree.visible,
1073                            abs_path: worktree.abs_path.clone(),
1074                        })
1075                        .collect(),
1076                    collaborators: rejoined_project
1077                        .collaborators
1078                        .iter()
1079                        .map(|collaborator| collaborator.to_proto())
1080                        .collect(),
1081                    language_servers: rejoined_project.language_servers.clone(),
1082                })
1083                .collect(),
1084        })?;
1085        room_updated(&rejoined_room.room, &session.peer);
1086
1087        for project in &rejoined_room.reshared_projects {
1088            for collaborator in &project.collaborators {
1089                session
1090                    .peer
1091                    .send(
1092                        collaborator.connection_id,
1093                        proto::UpdateProjectCollaborator {
1094                            project_id: project.id.to_proto(),
1095                            old_peer_id: Some(project.old_connection_id.into()),
1096                            new_peer_id: Some(session.connection_id.into()),
1097                        },
1098                    )
1099                    .trace_err();
1100            }
1101
1102            broadcast(
1103                Some(session.connection_id),
1104                project
1105                    .collaborators
1106                    .iter()
1107                    .map(|collaborator| collaborator.connection_id),
1108                |connection_id| {
1109                    session.peer.forward_send(
1110                        session.connection_id,
1111                        connection_id,
1112                        proto::UpdateProject {
1113                            project_id: project.id.to_proto(),
1114                            worktrees: project.worktrees.clone(),
1115                        },
1116                    )
1117                },
1118            );
1119        }
1120
1121        for project in &rejoined_room.rejoined_projects {
1122            for collaborator in &project.collaborators {
1123                session
1124                    .peer
1125                    .send(
1126                        collaborator.connection_id,
1127                        proto::UpdateProjectCollaborator {
1128                            project_id: project.id.to_proto(),
1129                            old_peer_id: Some(project.old_connection_id.into()),
1130                            new_peer_id: Some(session.connection_id.into()),
1131                        },
1132                    )
1133                    .trace_err();
1134            }
1135        }
1136
1137        for project in &mut rejoined_room.rejoined_projects {
1138            for worktree in mem::take(&mut project.worktrees) {
1139                #[cfg(any(test, feature = "test-support"))]
1140                const MAX_CHUNK_SIZE: usize = 2;
1141                #[cfg(not(any(test, feature = "test-support")))]
1142                const MAX_CHUNK_SIZE: usize = 256;
1143
1144                // Stream this worktree's entries.
1145                let message = proto::UpdateWorktree {
1146                    project_id: project.id.to_proto(),
1147                    worktree_id: worktree.id,
1148                    abs_path: worktree.abs_path.clone(),
1149                    root_name: worktree.root_name,
1150                    updated_entries: worktree.updated_entries,
1151                    removed_entries: worktree.removed_entries,
1152                    scan_id: worktree.scan_id,
1153                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1154                    updated_repositories: worktree.updated_repositories,
1155                    removed_repositories: worktree.removed_repositories,
1156                };
1157                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1158                    session.peer.send(session.connection_id, update.clone())?;
1159                }
1160
1161                // Stream this worktree's diagnostics.
1162                for summary in worktree.diagnostic_summaries {
1163                    session.peer.send(
1164                        session.connection_id,
1165                        proto::UpdateDiagnosticSummary {
1166                            project_id: project.id.to_proto(),
1167                            worktree_id: worktree.id,
1168                            summary: Some(summary),
1169                        },
1170                    )?;
1171                }
1172
1173                for settings_file in worktree.settings_files {
1174                    session.peer.send(
1175                        session.connection_id,
1176                        proto::UpdateWorktreeSettings {
1177                            project_id: project.id.to_proto(),
1178                            worktree_id: worktree.id,
1179                            path: settings_file.path,
1180                            content: Some(settings_file.content),
1181                        },
1182                    )?;
1183                }
1184            }
1185
1186            for language_server in &project.language_servers {
1187                session.peer.send(
1188                    session.connection_id,
1189                    proto::UpdateLanguageServer {
1190                        project_id: project.id.to_proto(),
1191                        language_server_id: language_server.id,
1192                        variant: Some(
1193                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1194                                proto::LspDiskBasedDiagnosticsUpdated {},
1195                            ),
1196                        ),
1197                    },
1198                )?;
1199            }
1200        }
1201
1202        let rejoined_room = rejoined_room.into_inner();
1203
1204        room = rejoined_room.room;
1205        channel_id = rejoined_room.channel_id;
1206        channel_members = rejoined_room.channel_members;
1207    }
1208
1209    if let Some(channel_id) = channel_id {
1210        channel_updated(
1211            channel_id,
1212            &room,
1213            &channel_members,
1214            &session.peer,
1215            &*session.connection_pool().await,
1216        );
1217    }
1218
1219    update_user_contacts(session.user_id, &session).await?;
1220    Ok(())
1221}
1222
1223async fn leave_room(
1224    _: proto::LeaveRoom,
1225    response: Response<proto::LeaveRoom>,
1226    session: Session,
1227) -> Result<()> {
1228    leave_room_for_session(&session).await?;
1229    response.send(proto::Ack {})?;
1230    Ok(())
1231}
1232
1233async fn call(
1234    request: proto::Call,
1235    response: Response<proto::Call>,
1236    session: Session,
1237) -> Result<()> {
1238    let room_id = RoomId::from_proto(request.room_id);
1239    let calling_user_id = session.user_id;
1240    let calling_connection_id = session.connection_id;
1241    let called_user_id = UserId::from_proto(request.called_user_id);
1242    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1243    if !session
1244        .db()
1245        .await
1246        .has_contact(calling_user_id, called_user_id)
1247        .await?
1248    {
1249        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1250    }
1251
1252    let incoming_call = {
1253        let (room, incoming_call) = &mut *session
1254            .db()
1255            .await
1256            .call(
1257                room_id,
1258                calling_user_id,
1259                calling_connection_id,
1260                called_user_id,
1261                initial_project_id,
1262            )
1263            .await?;
1264        room_updated(&room, &session.peer);
1265        mem::take(incoming_call)
1266    };
1267    update_user_contacts(called_user_id, &session).await?;
1268
1269    let mut calls = session
1270        .connection_pool()
1271        .await
1272        .user_connection_ids(called_user_id)
1273        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1274        .collect::<FuturesUnordered<_>>();
1275
1276    while let Some(call_response) = calls.next().await {
1277        match call_response.as_ref() {
1278            Ok(_) => {
1279                response.send(proto::Ack {})?;
1280                return Ok(());
1281            }
1282            Err(_) => {
1283                call_response.trace_err();
1284            }
1285        }
1286    }
1287
1288    {
1289        let room = session
1290            .db()
1291            .await
1292            .call_failed(room_id, called_user_id)
1293            .await?;
1294        room_updated(&room, &session.peer);
1295    }
1296    update_user_contacts(called_user_id, &session).await?;
1297
1298    Err(anyhow!("failed to ring user"))?
1299}
1300
1301async fn cancel_call(
1302    request: proto::CancelCall,
1303    response: Response<proto::CancelCall>,
1304    session: Session,
1305) -> Result<()> {
1306    let called_user_id = UserId::from_proto(request.called_user_id);
1307    let room_id = RoomId::from_proto(request.room_id);
1308    {
1309        let room = session
1310            .db()
1311            .await
1312            .cancel_call(room_id, session.connection_id, called_user_id)
1313            .await?;
1314        room_updated(&room, &session.peer);
1315    }
1316
1317    for connection_id in session
1318        .connection_pool()
1319        .await
1320        .user_connection_ids(called_user_id)
1321    {
1322        session
1323            .peer
1324            .send(
1325                connection_id,
1326                proto::CallCanceled {
1327                    room_id: room_id.to_proto(),
1328                },
1329            )
1330            .trace_err();
1331    }
1332    response.send(proto::Ack {})?;
1333
1334    update_user_contacts(called_user_id, &session).await?;
1335    Ok(())
1336}
1337
1338async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1339    let room_id = RoomId::from_proto(message.room_id);
1340    {
1341        let room = session
1342            .db()
1343            .await
1344            .decline_call(Some(room_id), session.user_id)
1345            .await?
1346            .ok_or_else(|| anyhow!("failed to decline call"))?;
1347        room_updated(&room, &session.peer);
1348    }
1349
1350    for connection_id in session
1351        .connection_pool()
1352        .await
1353        .user_connection_ids(session.user_id)
1354    {
1355        session
1356            .peer
1357            .send(
1358                connection_id,
1359                proto::CallCanceled {
1360                    room_id: room_id.to_proto(),
1361                },
1362            )
1363            .trace_err();
1364    }
1365    update_user_contacts(session.user_id, &session).await?;
1366    Ok(())
1367}
1368
1369async fn update_participant_location(
1370    request: proto::UpdateParticipantLocation,
1371    response: Response<proto::UpdateParticipantLocation>,
1372    session: Session,
1373) -> Result<()> {
1374    let room_id = RoomId::from_proto(request.room_id);
1375    let location = request
1376        .location
1377        .ok_or_else(|| anyhow!("invalid location"))?;
1378
1379    let db = session.db().await;
1380    let room = db
1381        .update_room_participant_location(room_id, session.connection_id, location)
1382        .await?;
1383
1384    room_updated(&room, &session.peer);
1385    response.send(proto::Ack {})?;
1386    Ok(())
1387}
1388
1389async fn share_project(
1390    request: proto::ShareProject,
1391    response: Response<proto::ShareProject>,
1392    session: Session,
1393) -> Result<()> {
1394    let (project_id, room) = &*session
1395        .db()
1396        .await
1397        .share_project(
1398            RoomId::from_proto(request.room_id),
1399            session.connection_id,
1400            &request.worktrees,
1401        )
1402        .await?;
1403    response.send(proto::ShareProjectResponse {
1404        project_id: project_id.to_proto(),
1405    })?;
1406    room_updated(&room, &session.peer);
1407
1408    Ok(())
1409}
1410
1411async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1412    let project_id = ProjectId::from_proto(message.project_id);
1413
1414    let (room, guest_connection_ids) = &*session
1415        .db()
1416        .await
1417        .unshare_project(project_id, session.connection_id)
1418        .await?;
1419
1420    broadcast(
1421        Some(session.connection_id),
1422        guest_connection_ids.iter().copied(),
1423        |conn_id| session.peer.send(conn_id, message.clone()),
1424    );
1425    room_updated(&room, &session.peer);
1426
1427    Ok(())
1428}
1429
1430async fn join_project(
1431    request: proto::JoinProject,
1432    response: Response<proto::JoinProject>,
1433    session: Session,
1434) -> Result<()> {
1435    let project_id = ProjectId::from_proto(request.project_id);
1436    let guest_user_id = session.user_id;
1437
1438    tracing::info!(%project_id, "join project");
1439
1440    let (project, replica_id) = &mut *session
1441        .db()
1442        .await
1443        .join_project(project_id, session.connection_id)
1444        .await?;
1445
1446    let collaborators = project
1447        .collaborators
1448        .iter()
1449        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1450        .map(|collaborator| collaborator.to_proto())
1451        .collect::<Vec<_>>();
1452
1453    let worktrees = project
1454        .worktrees
1455        .iter()
1456        .map(|(id, worktree)| proto::WorktreeMetadata {
1457            id: *id,
1458            root_name: worktree.root_name.clone(),
1459            visible: worktree.visible,
1460            abs_path: worktree.abs_path.clone(),
1461        })
1462        .collect::<Vec<_>>();
1463
1464    for collaborator in &collaborators {
1465        session
1466            .peer
1467            .send(
1468                collaborator.peer_id.unwrap().into(),
1469                proto::AddProjectCollaborator {
1470                    project_id: project_id.to_proto(),
1471                    collaborator: Some(proto::Collaborator {
1472                        peer_id: Some(session.connection_id.into()),
1473                        replica_id: replica_id.0 as u32,
1474                        user_id: guest_user_id.to_proto(),
1475                    }),
1476                },
1477            )
1478            .trace_err();
1479    }
1480
1481    // First, we send the metadata associated with each worktree.
1482    response.send(proto::JoinProjectResponse {
1483        worktrees: worktrees.clone(),
1484        replica_id: replica_id.0 as u32,
1485        collaborators: collaborators.clone(),
1486        language_servers: project.language_servers.clone(),
1487    })?;
1488
1489    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1490        #[cfg(any(test, feature = "test-support"))]
1491        const MAX_CHUNK_SIZE: usize = 2;
1492        #[cfg(not(any(test, feature = "test-support")))]
1493        const MAX_CHUNK_SIZE: usize = 256;
1494
1495        // Stream this worktree's entries.
1496        let message = proto::UpdateWorktree {
1497            project_id: project_id.to_proto(),
1498            worktree_id,
1499            abs_path: worktree.abs_path.clone(),
1500            root_name: worktree.root_name,
1501            updated_entries: worktree.entries,
1502            removed_entries: Default::default(),
1503            scan_id: worktree.scan_id,
1504            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1505            updated_repositories: worktree.repository_entries.into_values().collect(),
1506            removed_repositories: Default::default(),
1507        };
1508        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1509            session.peer.send(session.connection_id, update.clone())?;
1510        }
1511
1512        // Stream this worktree's diagnostics.
1513        for summary in worktree.diagnostic_summaries {
1514            session.peer.send(
1515                session.connection_id,
1516                proto::UpdateDiagnosticSummary {
1517                    project_id: project_id.to_proto(),
1518                    worktree_id: worktree.id,
1519                    summary: Some(summary),
1520                },
1521            )?;
1522        }
1523
1524        for settings_file in worktree.settings_files {
1525            session.peer.send(
1526                session.connection_id,
1527                proto::UpdateWorktreeSettings {
1528                    project_id: project_id.to_proto(),
1529                    worktree_id: worktree.id,
1530                    path: settings_file.path,
1531                    content: Some(settings_file.content),
1532                },
1533            )?;
1534        }
1535    }
1536
1537    for language_server in &project.language_servers {
1538        session.peer.send(
1539            session.connection_id,
1540            proto::UpdateLanguageServer {
1541                project_id: project_id.to_proto(),
1542                language_server_id: language_server.id,
1543                variant: Some(
1544                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1545                        proto::LspDiskBasedDiagnosticsUpdated {},
1546                    ),
1547                ),
1548            },
1549        )?;
1550    }
1551
1552    Ok(())
1553}
1554
1555async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1556    let sender_id = session.connection_id;
1557    let project_id = ProjectId::from_proto(request.project_id);
1558
1559    let (room, project) = &*session
1560        .db()
1561        .await
1562        .leave_project(project_id, sender_id)
1563        .await?;
1564    tracing::info!(
1565        %project_id,
1566        host_user_id = %project.host_user_id,
1567        host_connection_id = %project.host_connection_id,
1568        "leave project"
1569    );
1570
1571    project_left(&project, &session);
1572    room_updated(&room, &session.peer);
1573
1574    Ok(())
1575}
1576
1577async fn update_project(
1578    request: proto::UpdateProject,
1579    response: Response<proto::UpdateProject>,
1580    session: Session,
1581) -> Result<()> {
1582    let project_id = ProjectId::from_proto(request.project_id);
1583    let (room, guest_connection_ids) = &*session
1584        .db()
1585        .await
1586        .update_project(project_id, session.connection_id, &request.worktrees)
1587        .await?;
1588    broadcast(
1589        Some(session.connection_id),
1590        guest_connection_ids.iter().copied(),
1591        |connection_id| {
1592            session
1593                .peer
1594                .forward_send(session.connection_id, connection_id, request.clone())
1595        },
1596    );
1597    room_updated(&room, &session.peer);
1598    response.send(proto::Ack {})?;
1599
1600    Ok(())
1601}
1602
1603async fn update_worktree(
1604    request: proto::UpdateWorktree,
1605    response: Response<proto::UpdateWorktree>,
1606    session: Session,
1607) -> Result<()> {
1608    let guest_connection_ids = session
1609        .db()
1610        .await
1611        .update_worktree(&request, session.connection_id)
1612        .await?;
1613
1614    broadcast(
1615        Some(session.connection_id),
1616        guest_connection_ids.iter().copied(),
1617        |connection_id| {
1618            session
1619                .peer
1620                .forward_send(session.connection_id, connection_id, request.clone())
1621        },
1622    );
1623    response.send(proto::Ack {})?;
1624    Ok(())
1625}
1626
1627async fn update_diagnostic_summary(
1628    message: proto::UpdateDiagnosticSummary,
1629    session: Session,
1630) -> Result<()> {
1631    let guest_connection_ids = session
1632        .db()
1633        .await
1634        .update_diagnostic_summary(&message, session.connection_id)
1635        .await?;
1636
1637    broadcast(
1638        Some(session.connection_id),
1639        guest_connection_ids.iter().copied(),
1640        |connection_id| {
1641            session
1642                .peer
1643                .forward_send(session.connection_id, connection_id, message.clone())
1644        },
1645    );
1646
1647    Ok(())
1648}
1649
1650async fn update_worktree_settings(
1651    message: proto::UpdateWorktreeSettings,
1652    session: Session,
1653) -> Result<()> {
1654    let guest_connection_ids = session
1655        .db()
1656        .await
1657        .update_worktree_settings(&message, session.connection_id)
1658        .await?;
1659
1660    broadcast(
1661        Some(session.connection_id),
1662        guest_connection_ids.iter().copied(),
1663        |connection_id| {
1664            session
1665                .peer
1666                .forward_send(session.connection_id, connection_id, message.clone())
1667        },
1668    );
1669
1670    Ok(())
1671}
1672
1673async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1674    broadcast_project_message(request.project_id, request, session).await
1675}
1676
1677async fn start_language_server(
1678    request: proto::StartLanguageServer,
1679    session: Session,
1680) -> Result<()> {
1681    let guest_connection_ids = session
1682        .db()
1683        .await
1684        .start_language_server(&request, session.connection_id)
1685        .await?;
1686
1687    broadcast(
1688        Some(session.connection_id),
1689        guest_connection_ids.iter().copied(),
1690        |connection_id| {
1691            session
1692                .peer
1693                .forward_send(session.connection_id, connection_id, request.clone())
1694        },
1695    );
1696    Ok(())
1697}
1698
1699async fn update_language_server(
1700    request: proto::UpdateLanguageServer,
1701    session: Session,
1702) -> Result<()> {
1703    session.executor.record_backtrace();
1704    let project_id = ProjectId::from_proto(request.project_id);
1705    let project_connection_ids = session
1706        .db()
1707        .await
1708        .project_connection_ids(project_id, session.connection_id)
1709        .await?;
1710    broadcast(
1711        Some(session.connection_id),
1712        project_connection_ids.iter().copied(),
1713        |connection_id| {
1714            session
1715                .peer
1716                .forward_send(session.connection_id, connection_id, request.clone())
1717        },
1718    );
1719    Ok(())
1720}
1721
1722async fn forward_project_request<T>(
1723    request: T,
1724    response: Response<T>,
1725    session: Session,
1726) -> Result<()>
1727where
1728    T: EntityMessage + RequestMessage,
1729{
1730    session.executor.record_backtrace();
1731    let project_id = ProjectId::from_proto(request.remote_entity_id());
1732    let host_connection_id = {
1733        let collaborators = session
1734            .db()
1735            .await
1736            .project_collaborators(project_id, session.connection_id)
1737            .await?;
1738        collaborators
1739            .iter()
1740            .find(|collaborator| collaborator.is_host)
1741            .ok_or_else(|| anyhow!("host not found"))?
1742            .connection_id
1743    };
1744
1745    let payload = session
1746        .peer
1747        .forward_request(session.connection_id, host_connection_id, request)
1748        .await?;
1749
1750    response.send(payload)?;
1751    Ok(())
1752}
1753
1754async fn create_buffer_for_peer(
1755    request: proto::CreateBufferForPeer,
1756    session: Session,
1757) -> Result<()> {
1758    session.executor.record_backtrace();
1759    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1760    session
1761        .peer
1762        .forward_send(session.connection_id, peer_id.into(), request)?;
1763    Ok(())
1764}
1765
1766async fn update_buffer(
1767    request: proto::UpdateBuffer,
1768    response: Response<proto::UpdateBuffer>,
1769    session: Session,
1770) -> Result<()> {
1771    session.executor.record_backtrace();
1772    let project_id = ProjectId::from_proto(request.project_id);
1773    let mut guest_connection_ids;
1774    let mut host_connection_id = None;
1775    {
1776        let collaborators = session
1777            .db()
1778            .await
1779            .project_collaborators(project_id, session.connection_id)
1780            .await?;
1781        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1782        for collaborator in collaborators.iter() {
1783            if collaborator.is_host {
1784                host_connection_id = Some(collaborator.connection_id);
1785            } else {
1786                guest_connection_ids.push(collaborator.connection_id);
1787            }
1788        }
1789    }
1790    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1791
1792    session.executor.record_backtrace();
1793    broadcast(
1794        Some(session.connection_id),
1795        guest_connection_ids,
1796        |connection_id| {
1797            session
1798                .peer
1799                .forward_send(session.connection_id, connection_id, request.clone())
1800        },
1801    );
1802    if host_connection_id != session.connection_id {
1803        session
1804            .peer
1805            .forward_request(session.connection_id, host_connection_id, request.clone())
1806            .await?;
1807    }
1808
1809    response.send(proto::Ack {})?;
1810    Ok(())
1811}
1812
1813async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1814    let project_id = ProjectId::from_proto(request.project_id);
1815    let project_connection_ids = session
1816        .db()
1817        .await
1818        .project_connection_ids(project_id, session.connection_id)
1819        .await?;
1820
1821    broadcast(
1822        Some(session.connection_id),
1823        project_connection_ids.iter().copied(),
1824        |connection_id| {
1825            session
1826                .peer
1827                .forward_send(session.connection_id, connection_id, request.clone())
1828        },
1829    );
1830    Ok(())
1831}
1832
1833async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1834    let project_id = ProjectId::from_proto(request.project_id);
1835    let project_connection_ids = session
1836        .db()
1837        .await
1838        .project_connection_ids(project_id, session.connection_id)
1839        .await?;
1840    broadcast(
1841        Some(session.connection_id),
1842        project_connection_ids.iter().copied(),
1843        |connection_id| {
1844            session
1845                .peer
1846                .forward_send(session.connection_id, connection_id, request.clone())
1847        },
1848    );
1849    Ok(())
1850}
1851
1852async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1853    broadcast_project_message(request.project_id, request, session).await
1854}
1855
1856async fn broadcast_project_message<T: EnvelopedMessage>(
1857    project_id: u64,
1858    request: T,
1859    session: Session,
1860) -> Result<()> {
1861    let project_id = ProjectId::from_proto(project_id);
1862    let project_connection_ids = session
1863        .db()
1864        .await
1865        .project_connection_ids(project_id, session.connection_id)
1866        .await?;
1867    broadcast(
1868        Some(session.connection_id),
1869        project_connection_ids.iter().copied(),
1870        |connection_id| {
1871            session
1872                .peer
1873                .forward_send(session.connection_id, connection_id, request.clone())
1874        },
1875    );
1876    Ok(())
1877}
1878
1879async fn follow(
1880    request: proto::Follow,
1881    response: Response<proto::Follow>,
1882    session: Session,
1883) -> Result<()> {
1884    let project_id = ProjectId::from_proto(request.project_id);
1885    let leader_id = request
1886        .leader_id
1887        .ok_or_else(|| anyhow!("invalid leader id"))?
1888        .into();
1889    let follower_id = session.connection_id;
1890
1891    {
1892        let project_connection_ids = session
1893            .db()
1894            .await
1895            .project_connection_ids(project_id, session.connection_id)
1896            .await?;
1897
1898        if !project_connection_ids.contains(&leader_id) {
1899            Err(anyhow!("no such peer"))?;
1900        }
1901    }
1902
1903    let mut response_payload = session
1904        .peer
1905        .forward_request(session.connection_id, leader_id, request)
1906        .await?;
1907    response_payload
1908        .views
1909        .retain(|view| view.leader_id != Some(follower_id.into()));
1910    response.send(response_payload)?;
1911
1912    let room = session
1913        .db()
1914        .await
1915        .follow(project_id, leader_id, follower_id)
1916        .await?;
1917    room_updated(&room, &session.peer);
1918
1919    Ok(())
1920}
1921
1922async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1923    let project_id = ProjectId::from_proto(request.project_id);
1924    let leader_id = request
1925        .leader_id
1926        .ok_or_else(|| anyhow!("invalid leader id"))?
1927        .into();
1928    let follower_id = session.connection_id;
1929
1930    if !session
1931        .db()
1932        .await
1933        .project_connection_ids(project_id, session.connection_id)
1934        .await?
1935        .contains(&leader_id)
1936    {
1937        Err(anyhow!("no such peer"))?;
1938    }
1939
1940    session
1941        .peer
1942        .forward_send(session.connection_id, leader_id, request)?;
1943
1944    let room = session
1945        .db()
1946        .await
1947        .unfollow(project_id, leader_id, follower_id)
1948        .await?;
1949    room_updated(&room, &session.peer);
1950
1951    Ok(())
1952}
1953
1954async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1955    let project_id = ProjectId::from_proto(request.project_id);
1956    let project_connection_ids = session
1957        .db
1958        .lock()
1959        .await
1960        .project_connection_ids(project_id, session.connection_id)
1961        .await?;
1962
1963    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1964        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1965        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1966        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1967    });
1968    for follower_peer_id in request.follower_ids.iter().copied() {
1969        let follower_connection_id = follower_peer_id.into();
1970        if project_connection_ids.contains(&follower_connection_id)
1971            && Some(follower_peer_id) != leader_id
1972        {
1973            session.peer.forward_send(
1974                session.connection_id,
1975                follower_connection_id,
1976                request.clone(),
1977            )?;
1978        }
1979    }
1980    Ok(())
1981}
1982
1983async fn get_users(
1984    request: proto::GetUsers,
1985    response: Response<proto::GetUsers>,
1986    session: Session,
1987) -> Result<()> {
1988    let user_ids = request
1989        .user_ids
1990        .into_iter()
1991        .map(UserId::from_proto)
1992        .collect();
1993    let users = session
1994        .db()
1995        .await
1996        .get_users_by_ids(user_ids)
1997        .await?
1998        .into_iter()
1999        .map(|user| proto::User {
2000            id: user.id.to_proto(),
2001            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2002            github_login: user.github_login,
2003        })
2004        .collect();
2005    response.send(proto::UsersResponse { users })?;
2006    Ok(())
2007}
2008
2009async fn fuzzy_search_users(
2010    request: proto::FuzzySearchUsers,
2011    response: Response<proto::FuzzySearchUsers>,
2012    session: Session,
2013) -> Result<()> {
2014    let query = request.query;
2015    let users = match query.len() {
2016        0 => vec![],
2017        1 | 2 => session
2018            .db()
2019            .await
2020            .get_user_by_github_login(&query)
2021            .await?
2022            .into_iter()
2023            .collect(),
2024        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2025    };
2026    let users = users
2027        .into_iter()
2028        .filter(|user| user.id != session.user_id)
2029        .map(|user| proto::User {
2030            id: user.id.to_proto(),
2031            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2032            github_login: user.github_login,
2033        })
2034        .collect();
2035    response.send(proto::UsersResponse { users })?;
2036    Ok(())
2037}
2038
2039async fn request_contact(
2040    request: proto::RequestContact,
2041    response: Response<proto::RequestContact>,
2042    session: Session,
2043) -> Result<()> {
2044    let requester_id = session.user_id;
2045    let responder_id = UserId::from_proto(request.responder_id);
2046    if requester_id == responder_id {
2047        return Err(anyhow!("cannot add yourself as a contact"))?;
2048    }
2049
2050    session
2051        .db()
2052        .await
2053        .send_contact_request(requester_id, responder_id)
2054        .await?;
2055
2056    // Update outgoing contact requests of requester
2057    let mut update = proto::UpdateContacts::default();
2058    update.outgoing_requests.push(responder_id.to_proto());
2059    for connection_id in session
2060        .connection_pool()
2061        .await
2062        .user_connection_ids(requester_id)
2063    {
2064        session.peer.send(connection_id, update.clone())?;
2065    }
2066
2067    // Update incoming contact requests of responder
2068    let mut update = proto::UpdateContacts::default();
2069    update
2070        .incoming_requests
2071        .push(proto::IncomingContactRequest {
2072            requester_id: requester_id.to_proto(),
2073            should_notify: true,
2074        });
2075    for connection_id in session
2076        .connection_pool()
2077        .await
2078        .user_connection_ids(responder_id)
2079    {
2080        session.peer.send(connection_id, update.clone())?;
2081    }
2082
2083    response.send(proto::Ack {})?;
2084    Ok(())
2085}
2086
2087async fn respond_to_contact_request(
2088    request: proto::RespondToContactRequest,
2089    response: Response<proto::RespondToContactRequest>,
2090    session: Session,
2091) -> Result<()> {
2092    let responder_id = session.user_id;
2093    let requester_id = UserId::from_proto(request.requester_id);
2094    let db = session.db().await;
2095    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2096        db.dismiss_contact_notification(responder_id, requester_id)
2097            .await?;
2098    } else {
2099        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2100
2101        db.respond_to_contact_request(responder_id, requester_id, accept)
2102            .await?;
2103        let requester_busy = db.is_user_busy(requester_id).await?;
2104        let responder_busy = db.is_user_busy(responder_id).await?;
2105
2106        let pool = session.connection_pool().await;
2107        // Update responder with new contact
2108        let mut update = proto::UpdateContacts::default();
2109        if accept {
2110            update
2111                .contacts
2112                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2113        }
2114        update
2115            .remove_incoming_requests
2116            .push(requester_id.to_proto());
2117        for connection_id in pool.user_connection_ids(responder_id) {
2118            session.peer.send(connection_id, update.clone())?;
2119        }
2120
2121        // Update requester with new contact
2122        let mut update = proto::UpdateContacts::default();
2123        if accept {
2124            update
2125                .contacts
2126                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2127        }
2128        update
2129            .remove_outgoing_requests
2130            .push(responder_id.to_proto());
2131        for connection_id in pool.user_connection_ids(requester_id) {
2132            session.peer.send(connection_id, update.clone())?;
2133        }
2134    }
2135
2136    response.send(proto::Ack {})?;
2137    Ok(())
2138}
2139
2140async fn remove_contact(
2141    request: proto::RemoveContact,
2142    response: Response<proto::RemoveContact>,
2143    session: Session,
2144) -> Result<()> {
2145    let requester_id = session.user_id;
2146    let responder_id = UserId::from_proto(request.user_id);
2147    let db = session.db().await;
2148    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2149
2150    let pool = session.connection_pool().await;
2151    // Update outgoing contact requests of requester
2152    let mut update = proto::UpdateContacts::default();
2153    if contact_accepted {
2154        update.remove_contacts.push(responder_id.to_proto());
2155    } else {
2156        update
2157            .remove_outgoing_requests
2158            .push(responder_id.to_proto());
2159    }
2160    for connection_id in pool.user_connection_ids(requester_id) {
2161        session.peer.send(connection_id, update.clone())?;
2162    }
2163
2164    // Update incoming contact requests of responder
2165    let mut update = proto::UpdateContacts::default();
2166    if contact_accepted {
2167        update.remove_contacts.push(requester_id.to_proto());
2168    } else {
2169        update
2170            .remove_incoming_requests
2171            .push(requester_id.to_proto());
2172    }
2173    for connection_id in pool.user_connection_ids(responder_id) {
2174        session.peer.send(connection_id, update.clone())?;
2175    }
2176
2177    response.send(proto::Ack {})?;
2178    Ok(())
2179}
2180
2181async fn create_channel(
2182    request: proto::CreateChannel,
2183    response: Response<proto::CreateChannel>,
2184    session: Session,
2185) -> Result<()> {
2186    let db = session.db().await;
2187    let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2188
2189    if let Some(live_kit) = session.live_kit_client.as_ref() {
2190        live_kit.create_room(live_kit_room.clone()).await?;
2191    }
2192
2193    let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2194    let id = db
2195        .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2196        .await?;
2197
2198    let channel = proto::Channel {
2199        id: id.to_proto(),
2200        name: request.name,
2201        parent_id: request.parent_id,
2202    };
2203
2204    response.send(proto::ChannelResponse {
2205        channel: Some(channel.clone()),
2206    })?;
2207
2208    let mut update = proto::UpdateChannels::default();
2209    update.channels.push(channel);
2210
2211    let user_ids_to_notify = if let Some(parent_id) = parent_id {
2212        db.get_channel_members(parent_id).await?
2213    } else {
2214        vec![session.user_id]
2215    };
2216
2217    let connection_pool = session.connection_pool().await;
2218    for user_id in user_ids_to_notify {
2219        for connection_id in connection_pool.user_connection_ids(user_id) {
2220            let mut update = update.clone();
2221            if user_id == session.user_id {
2222                update.channel_permissions.push(proto::ChannelPermission {
2223                    channel_id: id.to_proto(),
2224                    is_admin: true,
2225                });
2226            }
2227            session.peer.send(connection_id, update)?;
2228        }
2229    }
2230
2231    Ok(())
2232}
2233
2234async fn delete_channel(
2235    request: proto::DeleteChannel,
2236    response: Response<proto::DeleteChannel>,
2237    session: Session,
2238) -> Result<()> {
2239    let db = session.db().await;
2240
2241    let channel_id = request.channel_id;
2242    let (removed_channels, member_ids) = db
2243        .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2244        .await?;
2245    response.send(proto::Ack {})?;
2246
2247    // Notify members of removed channels
2248    let mut update = proto::UpdateChannels::default();
2249    update
2250        .delete_channels
2251        .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2252
2253    let connection_pool = session.connection_pool().await;
2254    for member_id in member_ids {
2255        for connection_id in connection_pool.user_connection_ids(member_id) {
2256            session.peer.send(connection_id, update.clone())?;
2257        }
2258    }
2259
2260    Ok(())
2261}
2262
2263async fn invite_channel_member(
2264    request: proto::InviteChannelMember,
2265    response: Response<proto::InviteChannelMember>,
2266    session: Session,
2267) -> Result<()> {
2268    let db = session.db().await;
2269    let channel_id = ChannelId::from_proto(request.channel_id);
2270    let invitee_id = UserId::from_proto(request.user_id);
2271    db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2272        .await?;
2273
2274    let (channel, _) = db
2275        .get_channel(channel_id, session.user_id)
2276        .await?
2277        .ok_or_else(|| anyhow!("channel not found"))?;
2278
2279    let mut update = proto::UpdateChannels::default();
2280    update.channel_invitations.push(proto::Channel {
2281        id: channel.id.to_proto(),
2282        name: channel.name,
2283        parent_id: None,
2284    });
2285    for connection_id in session
2286        .connection_pool()
2287        .await
2288        .user_connection_ids(invitee_id)
2289    {
2290        session.peer.send(connection_id, update.clone())?;
2291    }
2292
2293    response.send(proto::Ack {})?;
2294    Ok(())
2295}
2296
2297async fn remove_channel_member(
2298    request: proto::RemoveChannelMember,
2299    response: Response<proto::RemoveChannelMember>,
2300    session: Session,
2301) -> Result<()> {
2302    let db = session.db().await;
2303    let channel_id = ChannelId::from_proto(request.channel_id);
2304    let member_id = UserId::from_proto(request.user_id);
2305
2306    db.remove_channel_member(channel_id, member_id, session.user_id)
2307        .await?;
2308
2309    let mut update = proto::UpdateChannels::default();
2310    update.delete_channels.push(channel_id.to_proto());
2311
2312    for connection_id in session
2313        .connection_pool()
2314        .await
2315        .user_connection_ids(member_id)
2316    {
2317        session.peer.send(connection_id, update.clone())?;
2318    }
2319
2320    response.send(proto::Ack {})?;
2321    Ok(())
2322}
2323
2324async fn set_channel_member_admin(
2325    request: proto::SetChannelMemberAdmin,
2326    response: Response<proto::SetChannelMemberAdmin>,
2327    session: Session,
2328) -> Result<()> {
2329    let db = session.db().await;
2330    let channel_id = ChannelId::from_proto(request.channel_id);
2331    let member_id = UserId::from_proto(request.user_id);
2332    db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2333        .await?;
2334
2335    let (channel, has_accepted) = db
2336        .get_channel(channel_id, member_id)
2337        .await?
2338        .ok_or_else(|| anyhow!("channel not found"))?;
2339
2340    let mut update = proto::UpdateChannels::default();
2341    if has_accepted {
2342        update.channel_permissions.push(proto::ChannelPermission {
2343            channel_id: channel.id.to_proto(),
2344            is_admin: request.admin,
2345        });
2346    }
2347
2348    for connection_id in session
2349        .connection_pool()
2350        .await
2351        .user_connection_ids(member_id)
2352    {
2353        session.peer.send(connection_id, update.clone())?;
2354    }
2355
2356    response.send(proto::Ack {})?;
2357    Ok(())
2358}
2359
2360async fn rename_channel(
2361    request: proto::RenameChannel,
2362    response: Response<proto::RenameChannel>,
2363    session: Session,
2364) -> Result<()> {
2365    let db = session.db().await;
2366    let channel_id = ChannelId::from_proto(request.channel_id);
2367    let new_name = db
2368        .rename_channel(channel_id, session.user_id, &request.name)
2369        .await?;
2370
2371    let channel = proto::Channel {
2372        id: request.channel_id,
2373        name: new_name,
2374        parent_id: None,
2375    };
2376    response.send(proto::ChannelResponse {
2377        channel: Some(channel.clone()),
2378    })?;
2379    let mut update = proto::UpdateChannels::default();
2380    update.channels.push(channel);
2381
2382    let member_ids = db.get_channel_members(channel_id).await?;
2383
2384    let connection_pool = session.connection_pool().await;
2385    for member_id in member_ids {
2386        for connection_id in connection_pool.user_connection_ids(member_id) {
2387            session.peer.send(connection_id, update.clone())?;
2388        }
2389    }
2390
2391    Ok(())
2392}
2393
2394async fn move_channel(
2395    request: proto::MoveChannel,
2396    response: Response<proto::MoveChannel>,
2397    session: Session,
2398) -> Result<()> {
2399    let db = session.db().await;
2400    let channel_id = ChannelId::from_proto(request.channel_id);
2401    let from_parent = request.from_parent.map(ChannelId::from_proto);
2402    let to = request.to.map(ChannelId::from_proto);
2403    let channels_to_send = db
2404        .move_channel(
2405            session.user_id,
2406            channel_id,
2407            from_parent,
2408            to,
2409        )
2410        .await?;
2411
2412
2413    if let Some(from_parent) = from_parent {
2414        let members = db.get_channel_members(from_parent).await?;
2415        let update = proto::UpdateChannels {
2416            delete_channel_edge: vec![proto::ChannelEdge {
2417                channel_id: channel_id.to_proto(),
2418                parent_id: from_parent.to_proto(),
2419            }],
2420            ..Default::default()
2421        };
2422        let connection_pool = session.connection_pool().await;
2423        for member_id in members {
2424            for connection_id in connection_pool.user_connection_ids(member_id) {
2425                session.peer.send(connection_id, update.clone())?;
2426            }
2427        }
2428
2429    }
2430
2431    if let Some(to) = to {
2432        let members = db.get_channel_members(to).await?;
2433        let connection_pool = session.connection_pool().await;
2434        let update = proto::UpdateChannels {
2435            channels: channels_to_send.into_iter().map(|channel| proto::Channel {
2436                id: channel.id.to_proto(),
2437                name: channel.name,
2438                parent_id: channel.parent_id.map(ChannelId::to_proto),
2439            }).collect(),
2440            ..Default::default()
2441        };
2442        for member_id in members {
2443            for connection_id in connection_pool.user_connection_ids(member_id) {
2444                session.peer.send(connection_id, update.clone())?;
2445            }
2446        }
2447    }
2448
2449    response.send(Ack {})?;
2450
2451    Ok(())
2452}
2453
2454async fn get_channel_members(
2455    request: proto::GetChannelMembers,
2456    response: Response<proto::GetChannelMembers>,
2457    session: Session,
2458) -> Result<()> {
2459    let db = session.db().await;
2460    let channel_id = ChannelId::from_proto(request.channel_id);
2461    let members = db
2462        .get_channel_member_details(channel_id, session.user_id)
2463        .await?;
2464    response.send(proto::GetChannelMembersResponse { members })?;
2465    Ok(())
2466}
2467
2468async fn respond_to_channel_invite(
2469    request: proto::RespondToChannelInvite,
2470    response: Response<proto::RespondToChannelInvite>,
2471    session: Session,
2472) -> Result<()> {
2473    let db = session.db().await;
2474    let channel_id = ChannelId::from_proto(request.channel_id);
2475    db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2476        .await?;
2477
2478    let mut update = proto::UpdateChannels::default();
2479    update
2480        .remove_channel_invitations
2481        .push(channel_id.to_proto());
2482    if request.accept {
2483        let result = db.get_channels_for_user(session.user_id).await?;
2484        update
2485            .channels
2486            .extend(result.channels.into_iter().map(|channel| proto::Channel {
2487                id: channel.id.to_proto(),
2488                name: channel.name,
2489                parent_id: channel.parent_id.map(ChannelId::to_proto),
2490            }));
2491        update
2492            .channel_participants
2493            .extend(
2494                result
2495                    .channel_participants
2496                    .into_iter()
2497                    .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2498                        channel_id: channel_id.to_proto(),
2499                        participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2500                    }),
2501            );
2502        update
2503            .channel_permissions
2504            .extend(
2505                result
2506                    .channels_with_admin_privileges
2507                    .into_iter()
2508                    .map(|channel_id| proto::ChannelPermission {
2509                        channel_id: channel_id.to_proto(),
2510                        is_admin: true,
2511                    }),
2512            );
2513    }
2514    session.peer.send(session.connection_id, update)?;
2515    response.send(proto::Ack {})?;
2516
2517    Ok(())
2518}
2519
2520async fn join_channel(
2521    request: proto::JoinChannel,
2522    response: Response<proto::JoinChannel>,
2523    session: Session,
2524) -> Result<()> {
2525    let channel_id = ChannelId::from_proto(request.channel_id);
2526
2527    let joined_room = {
2528        leave_room_for_session(&session).await?;
2529        let db = session.db().await;
2530
2531        let room_id = db.room_id_for_channel(channel_id).await?;
2532
2533        let joined_room = db
2534            .join_room(room_id, session.user_id, session.connection_id)
2535            .await?;
2536
2537        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2538            let token = live_kit
2539                .room_token(
2540                    &joined_room.room.live_kit_room,
2541                    &session.user_id.to_string(),
2542                )
2543                .trace_err()?;
2544
2545            Some(LiveKitConnectionInfo {
2546                server_url: live_kit.url().into(),
2547                token,
2548            })
2549        });
2550
2551        response.send(proto::JoinRoomResponse {
2552            room: Some(joined_room.room.clone()),
2553            channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2554            live_kit_connection_info,
2555        })?;
2556
2557        room_updated(&joined_room.room, &session.peer);
2558
2559        joined_room.into_inner()
2560    };
2561
2562    channel_updated(
2563        channel_id,
2564        &joined_room.room,
2565        &joined_room.channel_members,
2566        &session.peer,
2567        &*session.connection_pool().await,
2568    );
2569
2570    update_user_contacts(session.user_id, &session).await?;
2571
2572    Ok(())
2573}
2574
2575async fn join_channel_buffer(
2576    request: proto::JoinChannelBuffer,
2577    response: Response<proto::JoinChannelBuffer>,
2578    session: Session,
2579) -> Result<()> {
2580    let db = session.db().await;
2581    let channel_id = ChannelId::from_proto(request.channel_id);
2582
2583    let open_response = db
2584        .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2585        .await?;
2586
2587    let replica_id = open_response.replica_id;
2588    let collaborators = open_response.collaborators.clone();
2589
2590    response.send(open_response)?;
2591
2592    let update = AddChannelBufferCollaborator {
2593        channel_id: channel_id.to_proto(),
2594        collaborator: Some(proto::Collaborator {
2595            user_id: session.user_id.to_proto(),
2596            peer_id: Some(session.connection_id.into()),
2597            replica_id,
2598        }),
2599    };
2600    channel_buffer_updated(
2601        session.connection_id,
2602        collaborators
2603            .iter()
2604            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2605        &update,
2606        &session.peer,
2607    );
2608
2609    Ok(())
2610}
2611
2612async fn update_channel_buffer(
2613    request: proto::UpdateChannelBuffer,
2614    session: Session,
2615) -> Result<()> {
2616    let db = session.db().await;
2617    let channel_id = ChannelId::from_proto(request.channel_id);
2618
2619    let collaborators = db
2620        .update_channel_buffer(channel_id, session.user_id, &request.operations)
2621        .await?;
2622
2623    channel_buffer_updated(
2624        session.connection_id,
2625        collaborators,
2626        &proto::UpdateChannelBuffer {
2627            channel_id: channel_id.to_proto(),
2628            operations: request.operations,
2629        },
2630        &session.peer,
2631    );
2632    Ok(())
2633}
2634
2635async fn rejoin_channel_buffers(
2636    request: proto::RejoinChannelBuffers,
2637    response: Response<proto::RejoinChannelBuffers>,
2638    session: Session,
2639) -> Result<()> {
2640    let db = session.db().await;
2641    let buffers = db
2642        .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2643        .await?;
2644
2645    for buffer in &buffers {
2646        let collaborators_to_notify = buffer
2647            .buffer
2648            .collaborators
2649            .iter()
2650            .filter_map(|c| Some(c.peer_id?.into()));
2651        channel_buffer_updated(
2652            session.connection_id,
2653            collaborators_to_notify,
2654            &proto::UpdateChannelBufferCollaborator {
2655                channel_id: buffer.buffer.channel_id,
2656                old_peer_id: Some(buffer.old_connection_id.into()),
2657                new_peer_id: Some(session.connection_id.into()),
2658            },
2659            &session.peer,
2660        );
2661    }
2662
2663    response.send(proto::RejoinChannelBuffersResponse {
2664        buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2665    })?;
2666
2667    Ok(())
2668}
2669
2670async fn leave_channel_buffer(
2671    request: proto::LeaveChannelBuffer,
2672    response: Response<proto::LeaveChannelBuffer>,
2673    session: Session,
2674) -> Result<()> {
2675    let db = session.db().await;
2676    let channel_id = ChannelId::from_proto(request.channel_id);
2677
2678    let collaborators_to_notify = db
2679        .leave_channel_buffer(channel_id, session.connection_id)
2680        .await?;
2681
2682    response.send(Ack {})?;
2683
2684    channel_buffer_updated(
2685        session.connection_id,
2686        collaborators_to_notify,
2687        &proto::RemoveChannelBufferCollaborator {
2688            channel_id: channel_id.to_proto(),
2689            peer_id: Some(session.connection_id.into()),
2690        },
2691        &session.peer,
2692    );
2693
2694    Ok(())
2695}
2696
2697fn channel_buffer_updated<T: EnvelopedMessage>(
2698    sender_id: ConnectionId,
2699    collaborators: impl IntoIterator<Item = ConnectionId>,
2700    message: &T,
2701    peer: &Peer,
2702) {
2703    broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2704        peer.send(peer_id.into(), message.clone())
2705    });
2706}
2707
2708async fn send_channel_message(
2709    request: proto::SendChannelMessage,
2710    response: Response<proto::SendChannelMessage>,
2711    session: Session,
2712) -> Result<()> {
2713    // Validate the message body.
2714    let body = request.body.trim().to_string();
2715    if body.len() > MAX_MESSAGE_LEN {
2716        return Err(anyhow!("message is too long"))?;
2717    }
2718    if body.is_empty() {
2719        return Err(anyhow!("message can't be blank"))?;
2720    }
2721
2722    let timestamp = OffsetDateTime::now_utc();
2723    let nonce = request
2724        .nonce
2725        .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2726
2727    let channel_id = ChannelId::from_proto(request.channel_id);
2728    let (message_id, connection_ids) = session
2729        .db()
2730        .await
2731        .create_channel_message(
2732            channel_id,
2733            session.user_id,
2734            &body,
2735            timestamp,
2736            nonce.clone().into(),
2737        )
2738        .await?;
2739    let message = proto::ChannelMessage {
2740        sender_id: session.user_id.to_proto(),
2741        id: message_id.to_proto(),
2742        body,
2743        timestamp: timestamp.unix_timestamp() as u64,
2744        nonce: Some(nonce),
2745    };
2746    broadcast(Some(session.connection_id), connection_ids, |connection| {
2747        session.peer.send(
2748            connection,
2749            proto::ChannelMessageSent {
2750                channel_id: channel_id.to_proto(),
2751                message: Some(message.clone()),
2752            },
2753        )
2754    });
2755    response.send(proto::SendChannelMessageResponse {
2756        message: Some(message),
2757    })?;
2758    Ok(())
2759}
2760
2761async fn remove_channel_message(
2762    request: proto::RemoveChannelMessage,
2763    response: Response<proto::RemoveChannelMessage>,
2764    session: Session,
2765) -> Result<()> {
2766    let channel_id = ChannelId::from_proto(request.channel_id);
2767    let message_id = MessageId::from_proto(request.message_id);
2768    let connection_ids = session
2769        .db()
2770        .await
2771        .remove_channel_message(channel_id, message_id, session.user_id)
2772        .await?;
2773    broadcast(Some(session.connection_id), connection_ids, |connection| {
2774        session.peer.send(connection, request.clone())
2775    });
2776    response.send(proto::Ack {})?;
2777    Ok(())
2778}
2779
2780async fn join_channel_chat(
2781    request: proto::JoinChannelChat,
2782    response: Response<proto::JoinChannelChat>,
2783    session: Session,
2784) -> Result<()> {
2785    let channel_id = ChannelId::from_proto(request.channel_id);
2786
2787    let db = session.db().await;
2788    db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2789        .await?;
2790    let messages = db
2791        .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2792        .await?;
2793    response.send(proto::JoinChannelChatResponse {
2794        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2795        messages,
2796    })?;
2797    Ok(())
2798}
2799
2800async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2801    let channel_id = ChannelId::from_proto(request.channel_id);
2802    session
2803        .db()
2804        .await
2805        .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2806        .await?;
2807    Ok(())
2808}
2809
2810async fn get_channel_messages(
2811    request: proto::GetChannelMessages,
2812    response: Response<proto::GetChannelMessages>,
2813    session: Session,
2814) -> Result<()> {
2815    let channel_id = ChannelId::from_proto(request.channel_id);
2816    let messages = session
2817        .db()
2818        .await
2819        .get_channel_messages(
2820            channel_id,
2821            session.user_id,
2822            MESSAGE_COUNT_PER_PAGE,
2823            Some(MessageId::from_proto(request.before_message_id)),
2824        )
2825        .await?;
2826    response.send(proto::GetChannelMessagesResponse {
2827        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2828        messages,
2829    })?;
2830    Ok(())
2831}
2832
2833async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2834    let project_id = ProjectId::from_proto(request.project_id);
2835    let project_connection_ids = session
2836        .db()
2837        .await
2838        .project_connection_ids(project_id, session.connection_id)
2839        .await?;
2840    broadcast(
2841        Some(session.connection_id),
2842        project_connection_ids.iter().copied(),
2843        |connection_id| {
2844            session
2845                .peer
2846                .forward_send(session.connection_id, connection_id, request.clone())
2847        },
2848    );
2849    Ok(())
2850}
2851
2852async fn get_private_user_info(
2853    _request: proto::GetPrivateUserInfo,
2854    response: Response<proto::GetPrivateUserInfo>,
2855    session: Session,
2856) -> Result<()> {
2857    let db = session.db().await;
2858
2859    let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2860    let user = db
2861        .get_user_by_id(session.user_id)
2862        .await?
2863        .ok_or_else(|| anyhow!("user not found"))?;
2864    let flags = db.get_user_flags(session.user_id).await?;
2865
2866    response.send(proto::GetPrivateUserInfoResponse {
2867        metrics_id,
2868        staff: user.admin,
2869        flags,
2870    })?;
2871    Ok(())
2872}
2873
2874fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2875    match message {
2876        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2877        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2878        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2879        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2880        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2881            code: frame.code.into(),
2882            reason: frame.reason,
2883        })),
2884    }
2885}
2886
2887fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2888    match message {
2889        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2890        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2891        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2892        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2893        AxumMessage::Close(frame) => {
2894            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2895                code: frame.code.into(),
2896                reason: frame.reason,
2897            }))
2898        }
2899    }
2900}
2901
2902fn build_initial_channels_update(
2903    channels: ChannelsForUser,
2904    channel_invites: Vec<db::Channel>,
2905) -> proto::UpdateChannels {
2906    let mut update = proto::UpdateChannels::default();
2907
2908    for channel in channels.channels {
2909        update.channels.push(proto::Channel {
2910            id: channel.id.to_proto(),
2911            name: channel.name,
2912            parent_id: channel.parent_id.map(|id| id.to_proto()),
2913        });
2914    }
2915
2916    for (channel_id, participants) in channels.channel_participants {
2917        update
2918            .channel_participants
2919            .push(proto::ChannelParticipants {
2920                channel_id: channel_id.to_proto(),
2921                participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2922            });
2923    }
2924
2925    update
2926        .channel_permissions
2927        .extend(
2928            channels
2929                .channels_with_admin_privileges
2930                .into_iter()
2931                .map(|id| proto::ChannelPermission {
2932                    channel_id: id.to_proto(),
2933                    is_admin: true,
2934                }),
2935        );
2936
2937    for channel in channel_invites {
2938        update.channel_invitations.push(proto::Channel {
2939            id: channel.id.to_proto(),
2940            name: channel.name,
2941            parent_id: None,
2942        });
2943    }
2944
2945    update
2946}
2947
2948fn build_initial_contacts_update(
2949    contacts: Vec<db::Contact>,
2950    pool: &ConnectionPool,
2951) -> proto::UpdateContacts {
2952    let mut update = proto::UpdateContacts::default();
2953
2954    for contact in contacts {
2955        match contact {
2956            db::Contact::Accepted {
2957                user_id,
2958                should_notify,
2959                busy,
2960            } => {
2961                update
2962                    .contacts
2963                    .push(contact_for_user(user_id, should_notify, busy, &pool));
2964            }
2965            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2966            db::Contact::Incoming {
2967                user_id,
2968                should_notify,
2969            } => update
2970                .incoming_requests
2971                .push(proto::IncomingContactRequest {
2972                    requester_id: user_id.to_proto(),
2973                    should_notify,
2974                }),
2975        }
2976    }
2977
2978    update
2979}
2980
2981fn contact_for_user(
2982    user_id: UserId,
2983    should_notify: bool,
2984    busy: bool,
2985    pool: &ConnectionPool,
2986) -> proto::Contact {
2987    proto::Contact {
2988        user_id: user_id.to_proto(),
2989        online: pool.is_user_online(user_id),
2990        busy,
2991        should_notify,
2992    }
2993}
2994
2995fn room_updated(room: &proto::Room, peer: &Peer) {
2996    broadcast(
2997        None,
2998        room.participants
2999            .iter()
3000            .filter_map(|participant| Some(participant.peer_id?.into())),
3001        |peer_id| {
3002            peer.send(
3003                peer_id.into(),
3004                proto::RoomUpdated {
3005                    room: Some(room.clone()),
3006                },
3007            )
3008        },
3009    );
3010}
3011
3012fn channel_updated(
3013    channel_id: ChannelId,
3014    room: &proto::Room,
3015    channel_members: &[UserId],
3016    peer: &Peer,
3017    pool: &ConnectionPool,
3018) {
3019    let participants = room
3020        .participants
3021        .iter()
3022        .map(|p| p.user_id)
3023        .collect::<Vec<_>>();
3024
3025    broadcast(
3026        None,
3027        channel_members
3028            .iter()
3029            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3030        |peer_id| {
3031            peer.send(
3032                peer_id.into(),
3033                proto::UpdateChannels {
3034                    channel_participants: vec![proto::ChannelParticipants {
3035                        channel_id: channel_id.to_proto(),
3036                        participant_user_ids: participants.clone(),
3037                    }],
3038                    ..Default::default()
3039                },
3040            )
3041        },
3042    );
3043}
3044
3045async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3046    let db = session.db().await;
3047
3048    let contacts = db.get_contacts(user_id).await?;
3049    let busy = db.is_user_busy(user_id).await?;
3050
3051    let pool = session.connection_pool().await;
3052    let updated_contact = contact_for_user(user_id, false, busy, &pool);
3053    for contact in contacts {
3054        if let db::Contact::Accepted {
3055            user_id: contact_user_id,
3056            ..
3057        } = contact
3058        {
3059            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3060                session
3061                    .peer
3062                    .send(
3063                        contact_conn_id,
3064                        proto::UpdateContacts {
3065                            contacts: vec![updated_contact.clone()],
3066                            remove_contacts: Default::default(),
3067                            incoming_requests: Default::default(),
3068                            remove_incoming_requests: Default::default(),
3069                            outgoing_requests: Default::default(),
3070                            remove_outgoing_requests: Default::default(),
3071                        },
3072                    )
3073                    .trace_err();
3074            }
3075        }
3076    }
3077    Ok(())
3078}
3079
3080async fn leave_room_for_session(session: &Session) -> Result<()> {
3081    let mut contacts_to_update = HashSet::default();
3082
3083    let room_id;
3084    let canceled_calls_to_user_ids;
3085    let live_kit_room;
3086    let delete_live_kit_room;
3087    let room;
3088    let channel_members;
3089    let channel_id;
3090
3091    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3092        contacts_to_update.insert(session.user_id);
3093
3094        for project in left_room.left_projects.values() {
3095            project_left(project, session);
3096        }
3097
3098        room_id = RoomId::from_proto(left_room.room.id);
3099        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3100        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3101        delete_live_kit_room = left_room.deleted;
3102        room = mem::take(&mut left_room.room);
3103        channel_members = mem::take(&mut left_room.channel_members);
3104        channel_id = left_room.channel_id;
3105
3106        room_updated(&room, &session.peer);
3107    } else {
3108        return Ok(());
3109    }
3110
3111    if let Some(channel_id) = channel_id {
3112        channel_updated(
3113            channel_id,
3114            &room,
3115            &channel_members,
3116            &session.peer,
3117            &*session.connection_pool().await,
3118        );
3119    }
3120
3121    {
3122        let pool = session.connection_pool().await;
3123        for canceled_user_id in canceled_calls_to_user_ids {
3124            for connection_id in pool.user_connection_ids(canceled_user_id) {
3125                session
3126                    .peer
3127                    .send(
3128                        connection_id,
3129                        proto::CallCanceled {
3130                            room_id: room_id.to_proto(),
3131                        },
3132                    )
3133                    .trace_err();
3134            }
3135            contacts_to_update.insert(canceled_user_id);
3136        }
3137    }
3138
3139    for contact_user_id in contacts_to_update {
3140        update_user_contacts(contact_user_id, &session).await?;
3141    }
3142
3143    if let Some(live_kit) = session.live_kit_client.as_ref() {
3144        live_kit
3145            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3146            .await
3147            .trace_err();
3148
3149        if delete_live_kit_room {
3150            live_kit.delete_room(live_kit_room).await.trace_err();
3151        }
3152    }
3153
3154    Ok(())
3155}
3156
3157async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3158    let left_channel_buffers = session
3159        .db()
3160        .await
3161        .leave_channel_buffers(session.connection_id)
3162        .await?;
3163
3164    for (channel_id, connections) in left_channel_buffers {
3165        channel_buffer_updated(
3166            session.connection_id,
3167            connections,
3168            &proto::RemoveChannelBufferCollaborator {
3169                channel_id: channel_id.to_proto(),
3170                peer_id: Some(session.connection_id.into()),
3171            },
3172            &session.peer,
3173        );
3174    }
3175
3176    Ok(())
3177}
3178
3179fn project_left(project: &db::LeftProject, session: &Session) {
3180    for connection_id in &project.connection_ids {
3181        if project.host_user_id == session.user_id {
3182            session
3183                .peer
3184                .send(
3185                    *connection_id,
3186                    proto::UnshareProject {
3187                        project_id: project.id.to_proto(),
3188                    },
3189                )
3190                .trace_err();
3191        } else {
3192            session
3193                .peer
3194                .send(
3195                    *connection_id,
3196                    proto::RemoveProjectCollaborator {
3197                        project_id: project.id.to_proto(),
3198                        peer_id: Some(session.connection_id.into()),
3199                    },
3200                )
3201                .trace_err();
3202        }
3203    }
3204}
3205
3206pub trait ResultExt {
3207    type Ok;
3208
3209    fn trace_err(self) -> Option<Self::Ok>;
3210}
3211
3212impl<T, E> ResultExt for Result<T, E>
3213where
3214    E: std::fmt::Debug,
3215{
3216    type Ok = T;
3217
3218    fn trace_err(self) -> Option<T> {
3219        match self {
3220            Ok(value) => Some(value),
3221            Err(error) => {
3222                tracing::error!("{:?}", error);
3223                None
3224            }
3225        }
3226    }
3227}