rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{
   6        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
   7        UserId,
   8    },
   9    executor::Executor,
  10    AppState, Result,
  11};
  12use anyhow::anyhow;
  13use async_tungstenite::tungstenite::{
  14    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  15};
  16use axum::{
  17    body::Body,
  18    extract::{
  19        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  20        ConnectInfo, WebSocketUpgrade,
  21    },
  22    headers::{Header, HeaderName},
  23    http::StatusCode,
  24    middleware,
  25    response::IntoResponse,
  26    routing::get,
  27    Extension, Router, TypedHeader,
  28};
  29use collections::{HashMap, HashSet};
  30pub use connection_pool::ConnectionPool;
  31use futures::{
  32    channel::oneshot,
  33    future::{self, BoxFuture},
  34    stream::FuturesUnordered,
  35    FutureExt, SinkExt, StreamExt, TryStreamExt,
  36};
  37use lazy_static::lazy_static;
  38use prometheus::{register_int_gauge, IntGauge};
  39use rpc::{
  40    proto::{
  41        self, Ack, AnyTypedEnvelope, ChannelEdge, EntityMessage, EnvelopedMessage,
  42        LiveKitConnectionInfo, RequestMessage, UpdateChannelBufferCollaborators,
  43    },
  44    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  45};
  46use serde::{Serialize, Serializer};
  47use std::{
  48    any::TypeId,
  49    fmt,
  50    future::Future,
  51    marker::PhantomData,
  52    mem,
  53    net::SocketAddr,
  54    ops::{Deref, DerefMut},
  55    rc::Rc,
  56    sync::{
  57        atomic::{AtomicBool, Ordering::SeqCst},
  58        Arc,
  59    },
  60    time::{Duration, Instant},
  61};
  62use time::OffsetDateTime;
  63use tokio::sync::{watch, Semaphore};
  64use tower::ServiceBuilder;
  65use tracing::{info_span, instrument, Instrument};
  66
  67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  69
  70const MESSAGE_COUNT_PER_PAGE: usize = 100;
  71const MAX_MESSAGE_LEN: usize = 1024;
  72
  73lazy_static! {
  74    static ref METRIC_CONNECTIONS: IntGauge =
  75        register_int_gauge!("connections", "number of connections").unwrap();
  76    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  77        "shared_projects",
  78        "number of open projects with one or more guests"
  79    )
  80    .unwrap();
  81}
  82
  83type MessageHandler =
  84    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  85
  86struct Response<R> {
  87    peer: Arc<Peer>,
  88    receipt: Receipt<R>,
  89    responded: Arc<AtomicBool>,
  90}
  91
  92impl<R: RequestMessage> Response<R> {
  93    fn send(self, payload: R::Response) -> Result<()> {
  94        self.responded.store(true, SeqCst);
  95        self.peer.respond(self.receipt, payload)?;
  96        Ok(())
  97    }
  98}
  99
 100#[derive(Clone)]
 101struct Session {
 102    user_id: UserId,
 103    connection_id: ConnectionId,
 104    db: Arc<tokio::sync::Mutex<DbHandle>>,
 105    peer: Arc<Peer>,
 106    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 107    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 108    executor: Executor,
 109}
 110
 111impl Session {
 112    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 113        #[cfg(test)]
 114        tokio::task::yield_now().await;
 115        let guard = self.db.lock().await;
 116        #[cfg(test)]
 117        tokio::task::yield_now().await;
 118        guard
 119    }
 120
 121    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 122        #[cfg(test)]
 123        tokio::task::yield_now().await;
 124        let guard = self.connection_pool.lock();
 125        ConnectionPoolGuard {
 126            guard,
 127            _not_send: PhantomData,
 128        }
 129    }
 130}
 131
 132impl fmt::Debug for Session {
 133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 134        f.debug_struct("Session")
 135            .field("user_id", &self.user_id)
 136            .field("connection_id", &self.connection_id)
 137            .finish()
 138    }
 139}
 140
 141struct DbHandle(Arc<Database>);
 142
 143impl Deref for DbHandle {
 144    type Target = Database;
 145
 146    fn deref(&self) -> &Self::Target {
 147        self.0.as_ref()
 148    }
 149}
 150
 151pub struct Server {
 152    id: parking_lot::Mutex<ServerId>,
 153    peer: Arc<Peer>,
 154    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 155    app_state: Arc<AppState>,
 156    executor: Executor,
 157    handlers: HashMap<TypeId, MessageHandler>,
 158    teardown: watch::Sender<()>,
 159}
 160
 161pub(crate) struct ConnectionPoolGuard<'a> {
 162    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 163    _not_send: PhantomData<Rc<()>>,
 164}
 165
 166#[derive(Serialize)]
 167pub struct ServerSnapshot<'a> {
 168    peer: &'a Peer,
 169    #[serde(serialize_with = "serialize_deref")]
 170    connection_pool: ConnectionPoolGuard<'a>,
 171}
 172
 173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 174where
 175    S: Serializer,
 176    T: Deref<Target = U>,
 177    U: Serialize,
 178{
 179    Serialize::serialize(value.deref(), serializer)
 180}
 181
 182impl Server {
 183    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 184        let mut server = Self {
 185            id: parking_lot::Mutex::new(id),
 186            peer: Peer::new(id.0 as u32),
 187            app_state,
 188            executor,
 189            connection_pool: Default::default(),
 190            handlers: Default::default(),
 191            teardown: watch::channel(()).0,
 192        };
 193
 194        server
 195            .add_request_handler(ping)
 196            .add_request_handler(create_room)
 197            .add_request_handler(join_room)
 198            .add_request_handler(rejoin_room)
 199            .add_request_handler(leave_room)
 200            .add_request_handler(call)
 201            .add_request_handler(cancel_call)
 202            .add_message_handler(decline_call)
 203            .add_request_handler(update_participant_location)
 204            .add_request_handler(share_project)
 205            .add_message_handler(unshare_project)
 206            .add_request_handler(join_project)
 207            .add_message_handler(leave_project)
 208            .add_request_handler(update_project)
 209            .add_request_handler(update_worktree)
 210            .add_message_handler(start_language_server)
 211            .add_message_handler(update_language_server)
 212            .add_message_handler(update_diagnostic_summary)
 213            .add_message_handler(update_worktree_settings)
 214            .add_message_handler(refresh_inlay_hints)
 215            .add_request_handler(forward_project_request::<proto::GetHover>)
 216            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 217            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 218            .add_request_handler(forward_project_request::<proto::GetReferences>)
 219            .add_request_handler(forward_project_request::<proto::SearchProject>)
 220            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 221            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 222            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 223            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 224            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 225            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 226            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 227            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 228            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 229            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 230            .add_request_handler(forward_project_request::<proto::PerformRename>)
 231            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 232            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 233            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 234            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 235            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 236            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 237            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 238            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 239            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 240            .add_request_handler(forward_project_request::<proto::InlayHints>)
 241            .add_message_handler(create_buffer_for_peer)
 242            .add_request_handler(update_buffer)
 243            .add_message_handler(update_buffer_file)
 244            .add_message_handler(buffer_reloaded)
 245            .add_message_handler(buffer_saved)
 246            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 247            .add_request_handler(get_users)
 248            .add_request_handler(fuzzy_search_users)
 249            .add_request_handler(request_contact)
 250            .add_request_handler(remove_contact)
 251            .add_request_handler(respond_to_contact_request)
 252            .add_request_handler(create_channel)
 253            .add_request_handler(delete_channel)
 254            .add_request_handler(invite_channel_member)
 255            .add_request_handler(remove_channel_member)
 256            .add_request_handler(set_channel_member_admin)
 257            .add_request_handler(rename_channel)
 258            .add_request_handler(join_channel_buffer)
 259            .add_request_handler(leave_channel_buffer)
 260            .add_message_handler(update_channel_buffer)
 261            .add_request_handler(rejoin_channel_buffers)
 262            .add_request_handler(get_channel_members)
 263            .add_request_handler(respond_to_channel_invite)
 264            .add_request_handler(join_channel)
 265            .add_request_handler(join_channel_chat)
 266            .add_message_handler(leave_channel_chat)
 267            .add_request_handler(send_channel_message)
 268            .add_request_handler(remove_channel_message)
 269            .add_request_handler(get_channel_messages)
 270            .add_request_handler(link_channel)
 271            .add_request_handler(unlink_channel)
 272            .add_request_handler(move_channel)
 273            .add_request_handler(follow)
 274            .add_message_handler(unfollow)
 275            .add_message_handler(update_followers)
 276            .add_message_handler(update_diff_base)
 277            .add_request_handler(get_private_user_info);
 278
 279        Arc::new(server)
 280    }
 281
 282    pub async fn start(&self) -> Result<()> {
 283        let server_id = *self.id.lock();
 284        let app_state = self.app_state.clone();
 285        let peer = self.peer.clone();
 286        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 287        let pool = self.connection_pool.clone();
 288        let live_kit_client = self.app_state.live_kit_client.clone();
 289
 290        let span = info_span!("start server");
 291        self.executor.spawn_detached(
 292            async move {
 293                tracing::info!("waiting for cleanup timeout");
 294                timeout.await;
 295                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 296                if let Some((room_ids, channel_ids)) = app_state
 297                    .db
 298                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
 299                    .await
 300                    .trace_err()
 301                {
 302                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 303                    tracing::info!(
 304                        stale_channel_buffer_count = channel_ids.len(),
 305                        "retrieved stale channel buffers"
 306                    );
 307
 308                    for channel_id in channel_ids {
 309                        if let Some(refreshed_channel_buffer) = app_state
 310                            .db
 311                            .clear_stale_channel_buffer_collaborators(channel_id, server_id)
 312                            .await
 313                            .trace_err()
 314                        {
 315                            for connection_id in refreshed_channel_buffer.connection_ids {
 316                                peer.send(
 317                                    connection_id,
 318                                    proto::UpdateChannelBufferCollaborators {
 319                                        channel_id: channel_id.to_proto(),
 320                                        collaborators: refreshed_channel_buffer
 321                                            .collaborators
 322                                            .clone(),
 323                                    },
 324                                )
 325                                .trace_err();
 326                            }
 327                        }
 328                    }
 329
 330                    for room_id in room_ids {
 331                        let mut contacts_to_update = HashSet::default();
 332                        let mut canceled_calls_to_user_ids = Vec::new();
 333                        let mut live_kit_room = String::new();
 334                        let mut delete_live_kit_room = false;
 335
 336                        if let Some(mut refreshed_room) = app_state
 337                            .db
 338                            .clear_stale_room_participants(room_id, server_id)
 339                            .await
 340                            .trace_err()
 341                        {
 342                            tracing::info!(
 343                                room_id = room_id.0,
 344                                new_participant_count = refreshed_room.room.participants.len(),
 345                                "refreshed room"
 346                            );
 347                            room_updated(&refreshed_room.room, &peer);
 348                            if let Some(channel_id) = refreshed_room.channel_id {
 349                                channel_updated(
 350                                    channel_id,
 351                                    &refreshed_room.room,
 352                                    &refreshed_room.channel_members,
 353                                    &peer,
 354                                    &*pool.lock(),
 355                                );
 356                            }
 357                            contacts_to_update
 358                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 359                            contacts_to_update
 360                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 361                            canceled_calls_to_user_ids =
 362                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 363                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 364                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 365                        }
 366
 367                        {
 368                            let pool = pool.lock();
 369                            for canceled_user_id in canceled_calls_to_user_ids {
 370                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 371                                    peer.send(
 372                                        connection_id,
 373                                        proto::CallCanceled {
 374                                            room_id: room_id.to_proto(),
 375                                        },
 376                                    )
 377                                    .trace_err();
 378                                }
 379                            }
 380                        }
 381
 382                        for user_id in contacts_to_update {
 383                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 384                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 385                            if let Some((busy, contacts)) = busy.zip(contacts) {
 386                                let pool = pool.lock();
 387                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 388                                for contact in contacts {
 389                                    if let db::Contact::Accepted {
 390                                        user_id: contact_user_id,
 391                                        ..
 392                                    } = contact
 393                                    {
 394                                        for contact_conn_id in
 395                                            pool.user_connection_ids(contact_user_id)
 396                                        {
 397                                            peer.send(
 398                                                contact_conn_id,
 399                                                proto::UpdateContacts {
 400                                                    contacts: vec![updated_contact.clone()],
 401                                                    remove_contacts: Default::default(),
 402                                                    incoming_requests: Default::default(),
 403                                                    remove_incoming_requests: Default::default(),
 404                                                    outgoing_requests: Default::default(),
 405                                                    remove_outgoing_requests: Default::default(),
 406                                                },
 407                                            )
 408                                            .trace_err();
 409                                        }
 410                                    }
 411                                }
 412                            }
 413                        }
 414
 415                        if let Some(live_kit) = live_kit_client.as_ref() {
 416                            if delete_live_kit_room {
 417                                live_kit.delete_room(live_kit_room).await.trace_err();
 418                            }
 419                        }
 420                    }
 421                }
 422
 423                app_state
 424                    .db
 425                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 426                    .await
 427                    .trace_err();
 428            }
 429            .instrument(span),
 430        );
 431        Ok(())
 432    }
 433
 434    pub fn teardown(&self) {
 435        self.peer.teardown();
 436        self.connection_pool.lock().reset();
 437        let _ = self.teardown.send(());
 438    }
 439
 440    #[cfg(test)]
 441    pub fn reset(&self, id: ServerId) {
 442        self.teardown();
 443        *self.id.lock() = id;
 444        self.peer.reset(id.0 as u32);
 445    }
 446
 447    #[cfg(test)]
 448    pub fn id(&self) -> ServerId {
 449        *self.id.lock()
 450    }
 451
 452    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 453    where
 454        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 455        Fut: 'static + Send + Future<Output = Result<()>>,
 456        M: EnvelopedMessage,
 457    {
 458        let prev_handler = self.handlers.insert(
 459            TypeId::of::<M>(),
 460            Box::new(move |envelope, session| {
 461                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 462                let span = info_span!(
 463                    "handle message",
 464                    payload_type = envelope.payload_type_name()
 465                );
 466                span.in_scope(|| {
 467                    tracing::info!(
 468                        payload_type = envelope.payload_type_name(),
 469                        "message received"
 470                    );
 471                });
 472                let start_time = Instant::now();
 473                let future = (handler)(*envelope, session);
 474                async move {
 475                    let result = future.await;
 476                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 477                    match result {
 478                        Err(error) => {
 479                            tracing::error!(%error, ?duration_ms, "error handling message")
 480                        }
 481                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 482                    }
 483                }
 484                .instrument(span)
 485                .boxed()
 486            }),
 487        );
 488        if prev_handler.is_some() {
 489            panic!("registered a handler for the same message twice");
 490        }
 491        self
 492    }
 493
 494    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 495    where
 496        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 497        Fut: 'static + Send + Future<Output = Result<()>>,
 498        M: EnvelopedMessage,
 499    {
 500        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 501        self
 502    }
 503
 504    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 505    where
 506        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 507        Fut: Send + Future<Output = Result<()>>,
 508        M: RequestMessage,
 509    {
 510        let handler = Arc::new(handler);
 511        self.add_handler(move |envelope, session| {
 512            let receipt = envelope.receipt();
 513            let handler = handler.clone();
 514            async move {
 515                let peer = session.peer.clone();
 516                let responded = Arc::new(AtomicBool::default());
 517                let response = Response {
 518                    peer: peer.clone(),
 519                    responded: responded.clone(),
 520                    receipt,
 521                };
 522                match (handler)(envelope.payload, response, session).await {
 523                    Ok(()) => {
 524                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 525                            Ok(())
 526                        } else {
 527                            Err(anyhow!("handler did not send a response"))?
 528                        }
 529                    }
 530                    Err(error) => {
 531                        peer.respond_with_error(
 532                            receipt,
 533                            proto::Error {
 534                                message: error.to_string(),
 535                            },
 536                        )?;
 537                        Err(error)
 538                    }
 539                }
 540            }
 541        })
 542    }
 543
 544    pub fn handle_connection(
 545        self: &Arc<Self>,
 546        connection: Connection,
 547        address: String,
 548        user: User,
 549        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 550        executor: Executor,
 551    ) -> impl Future<Output = Result<()>> {
 552        let this = self.clone();
 553        let user_id = user.id;
 554        let login = user.github_login;
 555        let span = info_span!("handle connection", %user_id, %login, %address);
 556        let mut teardown = self.teardown.subscribe();
 557        async move {
 558            let (connection_id, handle_io, mut incoming_rx) = this
 559                .peer
 560                .add_connection(connection, {
 561                    let executor = executor.clone();
 562                    move |duration| executor.sleep(duration)
 563                });
 564
 565            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 566            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 567            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 568
 569            if let Some(send_connection_id) = send_connection_id.take() {
 570                let _ = send_connection_id.send(connection_id);
 571            }
 572
 573            if !user.connected_once {
 574                this.peer.send(connection_id, proto::ShowContacts {})?;
 575                this.app_state.db.set_user_connected_once(user_id, true).await?;
 576            }
 577
 578            let (contacts, channels_for_user, channel_invites) = future::try_join3(
 579                this.app_state.db.get_contacts(user_id),
 580                this.app_state.db.get_channels_for_user(user_id),
 581                this.app_state.db.get_channel_invites_for_user(user_id)
 582            ).await?;
 583
 584            {
 585                let mut pool = this.connection_pool.lock();
 586                pool.add_connection(connection_id, user_id, user.admin);
 587                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 588                this.peer.send(connection_id, build_initial_channels_update(
 589                    channels_for_user,
 590                    channel_invites
 591                ))?;
 592            }
 593
 594            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 595                this.peer.send(connection_id, incoming_call)?;
 596            }
 597
 598            let session = Session {
 599                user_id,
 600                connection_id,
 601                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 602                peer: this.peer.clone(),
 603                connection_pool: this.connection_pool.clone(),
 604                live_kit_client: this.app_state.live_kit_client.clone(),
 605                executor: executor.clone(),
 606            };
 607            update_user_contacts(user_id, &session).await?;
 608
 609            let handle_io = handle_io.fuse();
 610            futures::pin_mut!(handle_io);
 611
 612            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 613            // This prevents deadlocks when e.g., client A performs a request to client B and
 614            // client B performs a request to client A. If both clients stop processing further
 615            // messages until their respective request completes, they won't have a chance to
 616            // respond to the other client's request and cause a deadlock.
 617            //
 618            // This arrangement ensures we will attempt to process earlier messages first, but fall
 619            // back to processing messages arrived later in the spirit of making progress.
 620            let mut foreground_message_handlers = FuturesUnordered::new();
 621            let concurrent_handlers = Arc::new(Semaphore::new(256));
 622            loop {
 623                let next_message = async {
 624                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 625                    let message = incoming_rx.next().await;
 626                    (permit, message)
 627                }.fuse();
 628                futures::pin_mut!(next_message);
 629                futures::select_biased! {
 630                    _ = teardown.changed().fuse() => return Ok(()),
 631                    result = handle_io => {
 632                        if let Err(error) = result {
 633                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 634                        }
 635                        break;
 636                    }
 637                    _ = foreground_message_handlers.next() => {}
 638                    next_message = next_message => {
 639                        let (permit, message) = next_message;
 640                        if let Some(message) = message {
 641                            let type_name = message.payload_type_name();
 642                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 643                            let span_enter = span.enter();
 644                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 645                                let is_background = message.is_background();
 646                                let handle_message = (handler)(message, session.clone());
 647                                drop(span_enter);
 648
 649                                let handle_message = async move {
 650                                    handle_message.await;
 651                                    drop(permit);
 652                                }.instrument(span);
 653                                if is_background {
 654                                    executor.spawn_detached(handle_message);
 655                                } else {
 656                                    foreground_message_handlers.push(handle_message);
 657                                }
 658                            } else {
 659                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 660                            }
 661                        } else {
 662                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 663                            break;
 664                        }
 665                    }
 666                }
 667            }
 668
 669            drop(foreground_message_handlers);
 670            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 671            if let Err(error) = connection_lost(session, teardown, executor).await {
 672                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 673            }
 674
 675            Ok(())
 676        }.instrument(span)
 677    }
 678
 679    pub async fn invite_code_redeemed(
 680        self: &Arc<Self>,
 681        inviter_id: UserId,
 682        invitee_id: UserId,
 683    ) -> Result<()> {
 684        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 685            if let Some(code) = &user.invite_code {
 686                let pool = self.connection_pool.lock();
 687                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 688                for connection_id in pool.user_connection_ids(inviter_id) {
 689                    self.peer.send(
 690                        connection_id,
 691                        proto::UpdateContacts {
 692                            contacts: vec![invitee_contact.clone()],
 693                            ..Default::default()
 694                        },
 695                    )?;
 696                    self.peer.send(
 697                        connection_id,
 698                        proto::UpdateInviteInfo {
 699                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 700                            count: user.invite_count as u32,
 701                        },
 702                    )?;
 703                }
 704            }
 705        }
 706        Ok(())
 707    }
 708
 709    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 710        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 711            if let Some(invite_code) = &user.invite_code {
 712                let pool = self.connection_pool.lock();
 713                for connection_id in pool.user_connection_ids(user_id) {
 714                    self.peer.send(
 715                        connection_id,
 716                        proto::UpdateInviteInfo {
 717                            url: format!(
 718                                "{}{}",
 719                                self.app_state.config.invite_link_prefix, invite_code
 720                            ),
 721                            count: user.invite_count as u32,
 722                        },
 723                    )?;
 724                }
 725            }
 726        }
 727        Ok(())
 728    }
 729
 730    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 731        ServerSnapshot {
 732            connection_pool: ConnectionPoolGuard {
 733                guard: self.connection_pool.lock(),
 734                _not_send: PhantomData,
 735            },
 736            peer: &self.peer,
 737        }
 738    }
 739}
 740
 741impl<'a> Deref for ConnectionPoolGuard<'a> {
 742    type Target = ConnectionPool;
 743
 744    fn deref(&self) -> &Self::Target {
 745        &*self.guard
 746    }
 747}
 748
 749impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 750    fn deref_mut(&mut self) -> &mut Self::Target {
 751        &mut *self.guard
 752    }
 753}
 754
 755impl<'a> Drop for ConnectionPoolGuard<'a> {
 756    fn drop(&mut self) {
 757        #[cfg(test)]
 758        self.check_invariants();
 759    }
 760}
 761
 762fn broadcast<F>(
 763    sender_id: Option<ConnectionId>,
 764    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 765    mut f: F,
 766) where
 767    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 768{
 769    for receiver_id in receiver_ids {
 770        if Some(receiver_id) != sender_id {
 771            if let Err(error) = f(receiver_id) {
 772                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 773            }
 774        }
 775    }
 776}
 777
 778lazy_static! {
 779    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 780}
 781
 782pub struct ProtocolVersion(u32);
 783
 784impl Header for ProtocolVersion {
 785    fn name() -> &'static HeaderName {
 786        &ZED_PROTOCOL_VERSION
 787    }
 788
 789    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 790    where
 791        Self: Sized,
 792        I: Iterator<Item = &'i axum::http::HeaderValue>,
 793    {
 794        let version = values
 795            .next()
 796            .ok_or_else(axum::headers::Error::invalid)?
 797            .to_str()
 798            .map_err(|_| axum::headers::Error::invalid())?
 799            .parse()
 800            .map_err(|_| axum::headers::Error::invalid())?;
 801        Ok(Self(version))
 802    }
 803
 804    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 805        values.extend([self.0.to_string().parse().unwrap()]);
 806    }
 807}
 808
 809pub fn routes(server: Arc<Server>) -> Router<Body> {
 810    Router::new()
 811        .route("/rpc", get(handle_websocket_request))
 812        .layer(
 813            ServiceBuilder::new()
 814                .layer(Extension(server.app_state.clone()))
 815                .layer(middleware::from_fn(auth::validate_header)),
 816        )
 817        .route("/metrics", get(handle_metrics))
 818        .layer(Extension(server))
 819}
 820
 821pub async fn handle_websocket_request(
 822    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 823    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 824    Extension(server): Extension<Arc<Server>>,
 825    Extension(user): Extension<User>,
 826    ws: WebSocketUpgrade,
 827) -> axum::response::Response {
 828    if protocol_version != rpc::PROTOCOL_VERSION {
 829        return (
 830            StatusCode::UPGRADE_REQUIRED,
 831            "client must be upgraded".to_string(),
 832        )
 833            .into_response();
 834    }
 835    let socket_address = socket_address.to_string();
 836    ws.on_upgrade(move |socket| {
 837        use util::ResultExt;
 838        let socket = socket
 839            .map_ok(to_tungstenite_message)
 840            .err_into()
 841            .with(|message| async move { Ok(to_axum_message(message)) });
 842        let connection = Connection::new(Box::pin(socket));
 843        async move {
 844            server
 845                .handle_connection(connection, socket_address, user, None, Executor::Production)
 846                .await
 847                .log_err();
 848        }
 849    })
 850}
 851
 852pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 853    let connections = server
 854        .connection_pool
 855        .lock()
 856        .connections()
 857        .filter(|connection| !connection.admin)
 858        .count();
 859
 860    METRIC_CONNECTIONS.set(connections as _);
 861
 862    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 863    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 864
 865    let encoder = prometheus::TextEncoder::new();
 866    let metric_families = prometheus::gather();
 867    let encoded_metrics = encoder
 868        .encode_to_string(&metric_families)
 869        .map_err(|err| anyhow!("{}", err))?;
 870    Ok(encoded_metrics)
 871}
 872
 873#[instrument(err, skip(executor))]
 874async fn connection_lost(
 875    session: Session,
 876    mut teardown: watch::Receiver<()>,
 877    executor: Executor,
 878) -> Result<()> {
 879    session.peer.disconnect(session.connection_id);
 880    session
 881        .connection_pool()
 882        .await
 883        .remove_connection(session.connection_id)?;
 884
 885    session
 886        .db()
 887        .await
 888        .connection_lost(session.connection_id)
 889        .await
 890        .trace_err();
 891
 892    futures::select_biased! {
 893        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 894            log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
 895            leave_room_for_session(&session).await.trace_err();
 896            leave_channel_buffers_for_session(&session)
 897                .await
 898                .trace_err();
 899
 900            if !session
 901                .connection_pool()
 902                .await
 903                .is_user_online(session.user_id)
 904            {
 905                let db = session.db().await;
 906                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 907                    room_updated(&room, &session.peer);
 908                }
 909            }
 910
 911            update_user_contacts(session.user_id, &session).await?;
 912        }
 913        _ = teardown.changed().fuse() => {}
 914    }
 915
 916    Ok(())
 917}
 918
 919async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 920    response.send(proto::Ack {})?;
 921    Ok(())
 922}
 923
 924async fn create_room(
 925    _request: proto::CreateRoom,
 926    response: Response<proto::CreateRoom>,
 927    session: Session,
 928) -> Result<()> {
 929    let live_kit_room = nanoid::nanoid!(30);
 930
 931    let live_kit_connection_info = {
 932        let live_kit_room = live_kit_room.clone();
 933        let live_kit = session.live_kit_client.as_ref();
 934
 935        util::async_iife!({
 936            let live_kit = live_kit?;
 937
 938            live_kit
 939                .create_room(live_kit_room.clone())
 940                .await
 941                .trace_err()?;
 942
 943            let token = live_kit
 944                .room_token(&live_kit_room, &session.user_id.to_string())
 945                .trace_err()?;
 946
 947            Some(proto::LiveKitConnectionInfo {
 948                server_url: live_kit.url().into(),
 949                token,
 950            })
 951        })
 952    }
 953    .await;
 954
 955    let room = session
 956        .db()
 957        .await
 958        .create_room(session.user_id, session.connection_id, &live_kit_room)
 959        .await?;
 960
 961    response.send(proto::CreateRoomResponse {
 962        room: Some(room.clone()),
 963        live_kit_connection_info,
 964    })?;
 965
 966    update_user_contacts(session.user_id, &session).await?;
 967    Ok(())
 968}
 969
 970async fn join_room(
 971    request: proto::JoinRoom,
 972    response: Response<proto::JoinRoom>,
 973    session: Session,
 974) -> Result<()> {
 975    let room_id = RoomId::from_proto(request.id);
 976    let joined_room = {
 977        let room = session
 978            .db()
 979            .await
 980            .join_room(room_id, session.user_id, session.connection_id)
 981            .await?;
 982        room_updated(&room.room, &session.peer);
 983        room.into_inner()
 984    };
 985
 986    if let Some(channel_id) = joined_room.channel_id {
 987        channel_updated(
 988            channel_id,
 989            &joined_room.room,
 990            &joined_room.channel_members,
 991            &session.peer,
 992            &*session.connection_pool().await,
 993        )
 994    }
 995
 996    for connection_id in session
 997        .connection_pool()
 998        .await
 999        .user_connection_ids(session.user_id)
1000    {
1001        session
1002            .peer
1003            .send(
1004                connection_id,
1005                proto::CallCanceled {
1006                    room_id: room_id.to_proto(),
1007                },
1008            )
1009            .trace_err();
1010    }
1011
1012    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1013        if let Some(token) = live_kit
1014            .room_token(
1015                &joined_room.room.live_kit_room,
1016                &session.user_id.to_string(),
1017            )
1018            .trace_err()
1019        {
1020            Some(proto::LiveKitConnectionInfo {
1021                server_url: live_kit.url().into(),
1022                token,
1023            })
1024        } else {
1025            None
1026        }
1027    } else {
1028        None
1029    };
1030
1031    response.send(proto::JoinRoomResponse {
1032        room: Some(joined_room.room),
1033        channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1034        live_kit_connection_info,
1035    })?;
1036
1037    update_user_contacts(session.user_id, &session).await?;
1038    Ok(())
1039}
1040
1041async fn rejoin_room(
1042    request: proto::RejoinRoom,
1043    response: Response<proto::RejoinRoom>,
1044    session: Session,
1045) -> Result<()> {
1046    let room;
1047    let channel_id;
1048    let channel_members;
1049    {
1050        let mut rejoined_room = session
1051            .db()
1052            .await
1053            .rejoin_room(request, session.user_id, session.connection_id)
1054            .await?;
1055
1056        response.send(proto::RejoinRoomResponse {
1057            room: Some(rejoined_room.room.clone()),
1058            reshared_projects: rejoined_room
1059                .reshared_projects
1060                .iter()
1061                .map(|project| proto::ResharedProject {
1062                    id: project.id.to_proto(),
1063                    collaborators: project
1064                        .collaborators
1065                        .iter()
1066                        .map(|collaborator| collaborator.to_proto())
1067                        .collect(),
1068                })
1069                .collect(),
1070            rejoined_projects: rejoined_room
1071                .rejoined_projects
1072                .iter()
1073                .map(|rejoined_project| proto::RejoinedProject {
1074                    id: rejoined_project.id.to_proto(),
1075                    worktrees: rejoined_project
1076                        .worktrees
1077                        .iter()
1078                        .map(|worktree| proto::WorktreeMetadata {
1079                            id: worktree.id,
1080                            root_name: worktree.root_name.clone(),
1081                            visible: worktree.visible,
1082                            abs_path: worktree.abs_path.clone(),
1083                        })
1084                        .collect(),
1085                    collaborators: rejoined_project
1086                        .collaborators
1087                        .iter()
1088                        .map(|collaborator| collaborator.to_proto())
1089                        .collect(),
1090                    language_servers: rejoined_project.language_servers.clone(),
1091                })
1092                .collect(),
1093        })?;
1094        room_updated(&rejoined_room.room, &session.peer);
1095
1096        for project in &rejoined_room.reshared_projects {
1097            for collaborator in &project.collaborators {
1098                session
1099                    .peer
1100                    .send(
1101                        collaborator.connection_id,
1102                        proto::UpdateProjectCollaborator {
1103                            project_id: project.id.to_proto(),
1104                            old_peer_id: Some(project.old_connection_id.into()),
1105                            new_peer_id: Some(session.connection_id.into()),
1106                        },
1107                    )
1108                    .trace_err();
1109            }
1110
1111            broadcast(
1112                Some(session.connection_id),
1113                project
1114                    .collaborators
1115                    .iter()
1116                    .map(|collaborator| collaborator.connection_id),
1117                |connection_id| {
1118                    session.peer.forward_send(
1119                        session.connection_id,
1120                        connection_id,
1121                        proto::UpdateProject {
1122                            project_id: project.id.to_proto(),
1123                            worktrees: project.worktrees.clone(),
1124                        },
1125                    )
1126                },
1127            );
1128        }
1129
1130        for project in &rejoined_room.rejoined_projects {
1131            for collaborator in &project.collaborators {
1132                session
1133                    .peer
1134                    .send(
1135                        collaborator.connection_id,
1136                        proto::UpdateProjectCollaborator {
1137                            project_id: project.id.to_proto(),
1138                            old_peer_id: Some(project.old_connection_id.into()),
1139                            new_peer_id: Some(session.connection_id.into()),
1140                        },
1141                    )
1142                    .trace_err();
1143            }
1144        }
1145
1146        for project in &mut rejoined_room.rejoined_projects {
1147            for worktree in mem::take(&mut project.worktrees) {
1148                #[cfg(any(test, feature = "test-support"))]
1149                const MAX_CHUNK_SIZE: usize = 2;
1150                #[cfg(not(any(test, feature = "test-support")))]
1151                const MAX_CHUNK_SIZE: usize = 256;
1152
1153                // Stream this worktree's entries.
1154                let message = proto::UpdateWorktree {
1155                    project_id: project.id.to_proto(),
1156                    worktree_id: worktree.id,
1157                    abs_path: worktree.abs_path.clone(),
1158                    root_name: worktree.root_name,
1159                    updated_entries: worktree.updated_entries,
1160                    removed_entries: worktree.removed_entries,
1161                    scan_id: worktree.scan_id,
1162                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1163                    updated_repositories: worktree.updated_repositories,
1164                    removed_repositories: worktree.removed_repositories,
1165                };
1166                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1167                    session.peer.send(session.connection_id, update.clone())?;
1168                }
1169
1170                // Stream this worktree's diagnostics.
1171                for summary in worktree.diagnostic_summaries {
1172                    session.peer.send(
1173                        session.connection_id,
1174                        proto::UpdateDiagnosticSummary {
1175                            project_id: project.id.to_proto(),
1176                            worktree_id: worktree.id,
1177                            summary: Some(summary),
1178                        },
1179                    )?;
1180                }
1181
1182                for settings_file in worktree.settings_files {
1183                    session.peer.send(
1184                        session.connection_id,
1185                        proto::UpdateWorktreeSettings {
1186                            project_id: project.id.to_proto(),
1187                            worktree_id: worktree.id,
1188                            path: settings_file.path,
1189                            content: Some(settings_file.content),
1190                        },
1191                    )?;
1192                }
1193            }
1194
1195            for language_server in &project.language_servers {
1196                session.peer.send(
1197                    session.connection_id,
1198                    proto::UpdateLanguageServer {
1199                        project_id: project.id.to_proto(),
1200                        language_server_id: language_server.id,
1201                        variant: Some(
1202                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1203                                proto::LspDiskBasedDiagnosticsUpdated {},
1204                            ),
1205                        ),
1206                    },
1207                )?;
1208            }
1209        }
1210
1211        let rejoined_room = rejoined_room.into_inner();
1212
1213        room = rejoined_room.room;
1214        channel_id = rejoined_room.channel_id;
1215        channel_members = rejoined_room.channel_members;
1216    }
1217
1218    if let Some(channel_id) = channel_id {
1219        channel_updated(
1220            channel_id,
1221            &room,
1222            &channel_members,
1223            &session.peer,
1224            &*session.connection_pool().await,
1225        );
1226    }
1227
1228    update_user_contacts(session.user_id, &session).await?;
1229    Ok(())
1230}
1231
1232async fn leave_room(
1233    _: proto::LeaveRoom,
1234    response: Response<proto::LeaveRoom>,
1235    session: Session,
1236) -> Result<()> {
1237    leave_room_for_session(&session).await?;
1238    response.send(proto::Ack {})?;
1239    Ok(())
1240}
1241
1242async fn call(
1243    request: proto::Call,
1244    response: Response<proto::Call>,
1245    session: Session,
1246) -> Result<()> {
1247    let room_id = RoomId::from_proto(request.room_id);
1248    let calling_user_id = session.user_id;
1249    let calling_connection_id = session.connection_id;
1250    let called_user_id = UserId::from_proto(request.called_user_id);
1251    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1252    if !session
1253        .db()
1254        .await
1255        .has_contact(calling_user_id, called_user_id)
1256        .await?
1257    {
1258        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1259    }
1260
1261    let incoming_call = {
1262        let (room, incoming_call) = &mut *session
1263            .db()
1264            .await
1265            .call(
1266                room_id,
1267                calling_user_id,
1268                calling_connection_id,
1269                called_user_id,
1270                initial_project_id,
1271            )
1272            .await?;
1273        room_updated(&room, &session.peer);
1274        mem::take(incoming_call)
1275    };
1276    update_user_contacts(called_user_id, &session).await?;
1277
1278    let mut calls = session
1279        .connection_pool()
1280        .await
1281        .user_connection_ids(called_user_id)
1282        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1283        .collect::<FuturesUnordered<_>>();
1284
1285    while let Some(call_response) = calls.next().await {
1286        match call_response.as_ref() {
1287            Ok(_) => {
1288                response.send(proto::Ack {})?;
1289                return Ok(());
1290            }
1291            Err(_) => {
1292                call_response.trace_err();
1293            }
1294        }
1295    }
1296
1297    {
1298        let room = session
1299            .db()
1300            .await
1301            .call_failed(room_id, called_user_id)
1302            .await?;
1303        room_updated(&room, &session.peer);
1304    }
1305    update_user_contacts(called_user_id, &session).await?;
1306
1307    Err(anyhow!("failed to ring user"))?
1308}
1309
1310async fn cancel_call(
1311    request: proto::CancelCall,
1312    response: Response<proto::CancelCall>,
1313    session: Session,
1314) -> Result<()> {
1315    let called_user_id = UserId::from_proto(request.called_user_id);
1316    let room_id = RoomId::from_proto(request.room_id);
1317    {
1318        let room = session
1319            .db()
1320            .await
1321            .cancel_call(room_id, session.connection_id, called_user_id)
1322            .await?;
1323        room_updated(&room, &session.peer);
1324    }
1325
1326    for connection_id in session
1327        .connection_pool()
1328        .await
1329        .user_connection_ids(called_user_id)
1330    {
1331        session
1332            .peer
1333            .send(
1334                connection_id,
1335                proto::CallCanceled {
1336                    room_id: room_id.to_proto(),
1337                },
1338            )
1339            .trace_err();
1340    }
1341    response.send(proto::Ack {})?;
1342
1343    update_user_contacts(called_user_id, &session).await?;
1344    Ok(())
1345}
1346
1347async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1348    let room_id = RoomId::from_proto(message.room_id);
1349    {
1350        let room = session
1351            .db()
1352            .await
1353            .decline_call(Some(room_id), session.user_id)
1354            .await?
1355            .ok_or_else(|| anyhow!("failed to decline call"))?;
1356        room_updated(&room, &session.peer);
1357    }
1358
1359    for connection_id in session
1360        .connection_pool()
1361        .await
1362        .user_connection_ids(session.user_id)
1363    {
1364        session
1365            .peer
1366            .send(
1367                connection_id,
1368                proto::CallCanceled {
1369                    room_id: room_id.to_proto(),
1370                },
1371            )
1372            .trace_err();
1373    }
1374    update_user_contacts(session.user_id, &session).await?;
1375    Ok(())
1376}
1377
1378async fn update_participant_location(
1379    request: proto::UpdateParticipantLocation,
1380    response: Response<proto::UpdateParticipantLocation>,
1381    session: Session,
1382) -> Result<()> {
1383    let room_id = RoomId::from_proto(request.room_id);
1384    let location = request
1385        .location
1386        .ok_or_else(|| anyhow!("invalid location"))?;
1387
1388    let db = session.db().await;
1389    let room = db
1390        .update_room_participant_location(room_id, session.connection_id, location)
1391        .await?;
1392
1393    room_updated(&room, &session.peer);
1394    response.send(proto::Ack {})?;
1395    Ok(())
1396}
1397
1398async fn share_project(
1399    request: proto::ShareProject,
1400    response: Response<proto::ShareProject>,
1401    session: Session,
1402) -> Result<()> {
1403    let (project_id, room) = &*session
1404        .db()
1405        .await
1406        .share_project(
1407            RoomId::from_proto(request.room_id),
1408            session.connection_id,
1409            &request.worktrees,
1410        )
1411        .await?;
1412    response.send(proto::ShareProjectResponse {
1413        project_id: project_id.to_proto(),
1414    })?;
1415    room_updated(&room, &session.peer);
1416
1417    Ok(())
1418}
1419
1420async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1421    let project_id = ProjectId::from_proto(message.project_id);
1422
1423    let (room, guest_connection_ids) = &*session
1424        .db()
1425        .await
1426        .unshare_project(project_id, session.connection_id)
1427        .await?;
1428
1429    broadcast(
1430        Some(session.connection_id),
1431        guest_connection_ids.iter().copied(),
1432        |conn_id| session.peer.send(conn_id, message.clone()),
1433    );
1434    room_updated(&room, &session.peer);
1435
1436    Ok(())
1437}
1438
1439async fn join_project(
1440    request: proto::JoinProject,
1441    response: Response<proto::JoinProject>,
1442    session: Session,
1443) -> Result<()> {
1444    let project_id = ProjectId::from_proto(request.project_id);
1445    let guest_user_id = session.user_id;
1446
1447    tracing::info!(%project_id, "join project");
1448
1449    let (project, replica_id) = &mut *session
1450        .db()
1451        .await
1452        .join_project(project_id, session.connection_id)
1453        .await?;
1454
1455    let collaborators = project
1456        .collaborators
1457        .iter()
1458        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1459        .map(|collaborator| collaborator.to_proto())
1460        .collect::<Vec<_>>();
1461
1462    let worktrees = project
1463        .worktrees
1464        .iter()
1465        .map(|(id, worktree)| proto::WorktreeMetadata {
1466            id: *id,
1467            root_name: worktree.root_name.clone(),
1468            visible: worktree.visible,
1469            abs_path: worktree.abs_path.clone(),
1470        })
1471        .collect::<Vec<_>>();
1472
1473    for collaborator in &collaborators {
1474        session
1475            .peer
1476            .send(
1477                collaborator.peer_id.unwrap().into(),
1478                proto::AddProjectCollaborator {
1479                    project_id: project_id.to_proto(),
1480                    collaborator: Some(proto::Collaborator {
1481                        peer_id: Some(session.connection_id.into()),
1482                        replica_id: replica_id.0 as u32,
1483                        user_id: guest_user_id.to_proto(),
1484                    }),
1485                },
1486            )
1487            .trace_err();
1488    }
1489
1490    // First, we send the metadata associated with each worktree.
1491    response.send(proto::JoinProjectResponse {
1492        worktrees: worktrees.clone(),
1493        replica_id: replica_id.0 as u32,
1494        collaborators: collaborators.clone(),
1495        language_servers: project.language_servers.clone(),
1496    })?;
1497
1498    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1499        #[cfg(any(test, feature = "test-support"))]
1500        const MAX_CHUNK_SIZE: usize = 2;
1501        #[cfg(not(any(test, feature = "test-support")))]
1502        const MAX_CHUNK_SIZE: usize = 256;
1503
1504        // Stream this worktree's entries.
1505        let message = proto::UpdateWorktree {
1506            project_id: project_id.to_proto(),
1507            worktree_id,
1508            abs_path: worktree.abs_path.clone(),
1509            root_name: worktree.root_name,
1510            updated_entries: worktree.entries,
1511            removed_entries: Default::default(),
1512            scan_id: worktree.scan_id,
1513            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1514            updated_repositories: worktree.repository_entries.into_values().collect(),
1515            removed_repositories: Default::default(),
1516        };
1517        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1518            session.peer.send(session.connection_id, update.clone())?;
1519        }
1520
1521        // Stream this worktree's diagnostics.
1522        for summary in worktree.diagnostic_summaries {
1523            session.peer.send(
1524                session.connection_id,
1525                proto::UpdateDiagnosticSummary {
1526                    project_id: project_id.to_proto(),
1527                    worktree_id: worktree.id,
1528                    summary: Some(summary),
1529                },
1530            )?;
1531        }
1532
1533        for settings_file in worktree.settings_files {
1534            session.peer.send(
1535                session.connection_id,
1536                proto::UpdateWorktreeSettings {
1537                    project_id: project_id.to_proto(),
1538                    worktree_id: worktree.id,
1539                    path: settings_file.path,
1540                    content: Some(settings_file.content),
1541                },
1542            )?;
1543        }
1544    }
1545
1546    for language_server in &project.language_servers {
1547        session.peer.send(
1548            session.connection_id,
1549            proto::UpdateLanguageServer {
1550                project_id: project_id.to_proto(),
1551                language_server_id: language_server.id,
1552                variant: Some(
1553                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1554                        proto::LspDiskBasedDiagnosticsUpdated {},
1555                    ),
1556                ),
1557            },
1558        )?;
1559    }
1560
1561    Ok(())
1562}
1563
1564async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1565    let sender_id = session.connection_id;
1566    let project_id = ProjectId::from_proto(request.project_id);
1567
1568    let (room, project) = &*session
1569        .db()
1570        .await
1571        .leave_project(project_id, sender_id)
1572        .await?;
1573    tracing::info!(
1574        %project_id,
1575        host_user_id = %project.host_user_id,
1576        host_connection_id = %project.host_connection_id,
1577        "leave project"
1578    );
1579
1580    project_left(&project, &session);
1581    room_updated(&room, &session.peer);
1582
1583    Ok(())
1584}
1585
1586async fn update_project(
1587    request: proto::UpdateProject,
1588    response: Response<proto::UpdateProject>,
1589    session: Session,
1590) -> Result<()> {
1591    let project_id = ProjectId::from_proto(request.project_id);
1592    let (room, guest_connection_ids) = &*session
1593        .db()
1594        .await
1595        .update_project(project_id, session.connection_id, &request.worktrees)
1596        .await?;
1597    broadcast(
1598        Some(session.connection_id),
1599        guest_connection_ids.iter().copied(),
1600        |connection_id| {
1601            session
1602                .peer
1603                .forward_send(session.connection_id, connection_id, request.clone())
1604        },
1605    );
1606    room_updated(&room, &session.peer);
1607    response.send(proto::Ack {})?;
1608
1609    Ok(())
1610}
1611
1612async fn update_worktree(
1613    request: proto::UpdateWorktree,
1614    response: Response<proto::UpdateWorktree>,
1615    session: Session,
1616) -> Result<()> {
1617    let guest_connection_ids = session
1618        .db()
1619        .await
1620        .update_worktree(&request, session.connection_id)
1621        .await?;
1622
1623    broadcast(
1624        Some(session.connection_id),
1625        guest_connection_ids.iter().copied(),
1626        |connection_id| {
1627            session
1628                .peer
1629                .forward_send(session.connection_id, connection_id, request.clone())
1630        },
1631    );
1632    response.send(proto::Ack {})?;
1633    Ok(())
1634}
1635
1636async fn update_diagnostic_summary(
1637    message: proto::UpdateDiagnosticSummary,
1638    session: Session,
1639) -> Result<()> {
1640    let guest_connection_ids = session
1641        .db()
1642        .await
1643        .update_diagnostic_summary(&message, session.connection_id)
1644        .await?;
1645
1646    broadcast(
1647        Some(session.connection_id),
1648        guest_connection_ids.iter().copied(),
1649        |connection_id| {
1650            session
1651                .peer
1652                .forward_send(session.connection_id, connection_id, message.clone())
1653        },
1654    );
1655
1656    Ok(())
1657}
1658
1659async fn update_worktree_settings(
1660    message: proto::UpdateWorktreeSettings,
1661    session: Session,
1662) -> Result<()> {
1663    let guest_connection_ids = session
1664        .db()
1665        .await
1666        .update_worktree_settings(&message, session.connection_id)
1667        .await?;
1668
1669    broadcast(
1670        Some(session.connection_id),
1671        guest_connection_ids.iter().copied(),
1672        |connection_id| {
1673            session
1674                .peer
1675                .forward_send(session.connection_id, connection_id, message.clone())
1676        },
1677    );
1678
1679    Ok(())
1680}
1681
1682async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1683    broadcast_project_message(request.project_id, request, session).await
1684}
1685
1686async fn start_language_server(
1687    request: proto::StartLanguageServer,
1688    session: Session,
1689) -> Result<()> {
1690    let guest_connection_ids = session
1691        .db()
1692        .await
1693        .start_language_server(&request, session.connection_id)
1694        .await?;
1695
1696    broadcast(
1697        Some(session.connection_id),
1698        guest_connection_ids.iter().copied(),
1699        |connection_id| {
1700            session
1701                .peer
1702                .forward_send(session.connection_id, connection_id, request.clone())
1703        },
1704    );
1705    Ok(())
1706}
1707
1708async fn update_language_server(
1709    request: proto::UpdateLanguageServer,
1710    session: Session,
1711) -> Result<()> {
1712    session.executor.record_backtrace();
1713    let project_id = ProjectId::from_proto(request.project_id);
1714    let project_connection_ids = session
1715        .db()
1716        .await
1717        .project_connection_ids(project_id, session.connection_id)
1718        .await?;
1719    broadcast(
1720        Some(session.connection_id),
1721        project_connection_ids.iter().copied(),
1722        |connection_id| {
1723            session
1724                .peer
1725                .forward_send(session.connection_id, connection_id, request.clone())
1726        },
1727    );
1728    Ok(())
1729}
1730
1731async fn forward_project_request<T>(
1732    request: T,
1733    response: Response<T>,
1734    session: Session,
1735) -> Result<()>
1736where
1737    T: EntityMessage + RequestMessage,
1738{
1739    session.executor.record_backtrace();
1740    let project_id = ProjectId::from_proto(request.remote_entity_id());
1741    let host_connection_id = {
1742        let collaborators = session
1743            .db()
1744            .await
1745            .project_collaborators(project_id, session.connection_id)
1746            .await?;
1747        collaborators
1748            .iter()
1749            .find(|collaborator| collaborator.is_host)
1750            .ok_or_else(|| anyhow!("host not found"))?
1751            .connection_id
1752    };
1753
1754    let payload = session
1755        .peer
1756        .forward_request(session.connection_id, host_connection_id, request)
1757        .await?;
1758
1759    response.send(payload)?;
1760    Ok(())
1761}
1762
1763async fn create_buffer_for_peer(
1764    request: proto::CreateBufferForPeer,
1765    session: Session,
1766) -> Result<()> {
1767    session.executor.record_backtrace();
1768    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1769    session
1770        .peer
1771        .forward_send(session.connection_id, peer_id.into(), request)?;
1772    Ok(())
1773}
1774
1775async fn update_buffer(
1776    request: proto::UpdateBuffer,
1777    response: Response<proto::UpdateBuffer>,
1778    session: Session,
1779) -> Result<()> {
1780    session.executor.record_backtrace();
1781    let project_id = ProjectId::from_proto(request.project_id);
1782    let mut guest_connection_ids;
1783    let mut host_connection_id = None;
1784    {
1785        let collaborators = session
1786            .db()
1787            .await
1788            .project_collaborators(project_id, session.connection_id)
1789            .await?;
1790        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1791        for collaborator in collaborators.iter() {
1792            if collaborator.is_host {
1793                host_connection_id = Some(collaborator.connection_id);
1794            } else {
1795                guest_connection_ids.push(collaborator.connection_id);
1796            }
1797        }
1798    }
1799    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1800
1801    session.executor.record_backtrace();
1802    broadcast(
1803        Some(session.connection_id),
1804        guest_connection_ids,
1805        |connection_id| {
1806            session
1807                .peer
1808                .forward_send(session.connection_id, connection_id, request.clone())
1809        },
1810    );
1811    if host_connection_id != session.connection_id {
1812        session
1813            .peer
1814            .forward_request(session.connection_id, host_connection_id, request.clone())
1815            .await?;
1816    }
1817
1818    response.send(proto::Ack {})?;
1819    Ok(())
1820}
1821
1822async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1823    let project_id = ProjectId::from_proto(request.project_id);
1824    let project_connection_ids = session
1825        .db()
1826        .await
1827        .project_connection_ids(project_id, session.connection_id)
1828        .await?;
1829
1830    broadcast(
1831        Some(session.connection_id),
1832        project_connection_ids.iter().copied(),
1833        |connection_id| {
1834            session
1835                .peer
1836                .forward_send(session.connection_id, connection_id, request.clone())
1837        },
1838    );
1839    Ok(())
1840}
1841
1842async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1843    let project_id = ProjectId::from_proto(request.project_id);
1844    let project_connection_ids = session
1845        .db()
1846        .await
1847        .project_connection_ids(project_id, session.connection_id)
1848        .await?;
1849    broadcast(
1850        Some(session.connection_id),
1851        project_connection_ids.iter().copied(),
1852        |connection_id| {
1853            session
1854                .peer
1855                .forward_send(session.connection_id, connection_id, request.clone())
1856        },
1857    );
1858    Ok(())
1859}
1860
1861async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1862    broadcast_project_message(request.project_id, request, session).await
1863}
1864
1865async fn broadcast_project_message<T: EnvelopedMessage>(
1866    project_id: u64,
1867    request: T,
1868    session: Session,
1869) -> Result<()> {
1870    let project_id = ProjectId::from_proto(project_id);
1871    let project_connection_ids = session
1872        .db()
1873        .await
1874        .project_connection_ids(project_id, session.connection_id)
1875        .await?;
1876    broadcast(
1877        Some(session.connection_id),
1878        project_connection_ids.iter().copied(),
1879        |connection_id| {
1880            session
1881                .peer
1882                .forward_send(session.connection_id, connection_id, request.clone())
1883        },
1884    );
1885    Ok(())
1886}
1887
1888async fn follow(
1889    request: proto::Follow,
1890    response: Response<proto::Follow>,
1891    session: Session,
1892) -> Result<()> {
1893    let room_id = RoomId::from_proto(request.room_id);
1894    let project_id = request.project_id.map(ProjectId::from_proto);
1895    let leader_id = request
1896        .leader_id
1897        .ok_or_else(|| anyhow!("invalid leader id"))?
1898        .into();
1899    let follower_id = session.connection_id;
1900
1901    session
1902        .db()
1903        .await
1904        .check_room_participants(room_id, leader_id, session.connection_id)
1905        .await?;
1906
1907    let mut response_payload = session
1908        .peer
1909        .forward_request(session.connection_id, leader_id, request)
1910        .await?;
1911    response_payload
1912        .views
1913        .retain(|view| view.leader_id != Some(follower_id.into()));
1914    response.send(response_payload)?;
1915
1916    if let Some(project_id) = project_id {
1917        let room = session
1918            .db()
1919            .await
1920            .follow(room_id, project_id, leader_id, follower_id)
1921            .await?;
1922        room_updated(&room, &session.peer);
1923    }
1924
1925    Ok(())
1926}
1927
1928async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1929    let room_id = RoomId::from_proto(request.room_id);
1930    let project_id = request.project_id.map(ProjectId::from_proto);
1931    let leader_id = request
1932        .leader_id
1933        .ok_or_else(|| anyhow!("invalid leader id"))?
1934        .into();
1935    let follower_id = session.connection_id;
1936
1937    session
1938        .db()
1939        .await
1940        .check_room_participants(room_id, leader_id, session.connection_id)
1941        .await?;
1942
1943    session
1944        .peer
1945        .forward_send(session.connection_id, leader_id, request)?;
1946
1947    if let Some(project_id) = project_id {
1948        let room = session
1949            .db()
1950            .await
1951            .unfollow(room_id, project_id, leader_id, follower_id)
1952            .await?;
1953        room_updated(&room, &session.peer);
1954    }
1955
1956    Ok(())
1957}
1958
1959async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1960    let room_id = RoomId::from_proto(request.room_id);
1961    let database = session.db.lock().await;
1962
1963    let connection_ids = if let Some(project_id) = request.project_id {
1964        let project_id = ProjectId::from_proto(project_id);
1965        database
1966            .project_connection_ids(project_id, session.connection_id)
1967            .await?
1968    } else {
1969        database
1970            .room_connection_ids(room_id, session.connection_id)
1971            .await?
1972    };
1973
1974    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1975        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1976        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1977        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1978    });
1979    for follower_peer_id in request.follower_ids.iter().copied() {
1980        let follower_connection_id = follower_peer_id.into();
1981        if Some(follower_peer_id) != leader_id && connection_ids.contains(&follower_connection_id) {
1982            session.peer.forward_send(
1983                session.connection_id,
1984                follower_connection_id,
1985                request.clone(),
1986            )?;
1987        }
1988    }
1989    Ok(())
1990}
1991
1992async fn get_users(
1993    request: proto::GetUsers,
1994    response: Response<proto::GetUsers>,
1995    session: Session,
1996) -> Result<()> {
1997    let user_ids = request
1998        .user_ids
1999        .into_iter()
2000        .map(UserId::from_proto)
2001        .collect();
2002    let users = session
2003        .db()
2004        .await
2005        .get_users_by_ids(user_ids)
2006        .await?
2007        .into_iter()
2008        .map(|user| proto::User {
2009            id: user.id.to_proto(),
2010            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2011            github_login: user.github_login,
2012        })
2013        .collect();
2014    response.send(proto::UsersResponse { users })?;
2015    Ok(())
2016}
2017
2018async fn fuzzy_search_users(
2019    request: proto::FuzzySearchUsers,
2020    response: Response<proto::FuzzySearchUsers>,
2021    session: Session,
2022) -> Result<()> {
2023    let query = request.query;
2024    let users = match query.len() {
2025        0 => vec![],
2026        1 | 2 => session
2027            .db()
2028            .await
2029            .get_user_by_github_login(&query)
2030            .await?
2031            .into_iter()
2032            .collect(),
2033        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2034    };
2035    let users = users
2036        .into_iter()
2037        .filter(|user| user.id != session.user_id)
2038        .map(|user| proto::User {
2039            id: user.id.to_proto(),
2040            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2041            github_login: user.github_login,
2042        })
2043        .collect();
2044    response.send(proto::UsersResponse { users })?;
2045    Ok(())
2046}
2047
2048async fn request_contact(
2049    request: proto::RequestContact,
2050    response: Response<proto::RequestContact>,
2051    session: Session,
2052) -> Result<()> {
2053    let requester_id = session.user_id;
2054    let responder_id = UserId::from_proto(request.responder_id);
2055    if requester_id == responder_id {
2056        return Err(anyhow!("cannot add yourself as a contact"))?;
2057    }
2058
2059    session
2060        .db()
2061        .await
2062        .send_contact_request(requester_id, responder_id)
2063        .await?;
2064
2065    // Update outgoing contact requests of requester
2066    let mut update = proto::UpdateContacts::default();
2067    update.outgoing_requests.push(responder_id.to_proto());
2068    for connection_id in session
2069        .connection_pool()
2070        .await
2071        .user_connection_ids(requester_id)
2072    {
2073        session.peer.send(connection_id, update.clone())?;
2074    }
2075
2076    // Update incoming contact requests of responder
2077    let mut update = proto::UpdateContacts::default();
2078    update
2079        .incoming_requests
2080        .push(proto::IncomingContactRequest {
2081            requester_id: requester_id.to_proto(),
2082            should_notify: true,
2083        });
2084    for connection_id in session
2085        .connection_pool()
2086        .await
2087        .user_connection_ids(responder_id)
2088    {
2089        session.peer.send(connection_id, update.clone())?;
2090    }
2091
2092    response.send(proto::Ack {})?;
2093    Ok(())
2094}
2095
2096async fn respond_to_contact_request(
2097    request: proto::RespondToContactRequest,
2098    response: Response<proto::RespondToContactRequest>,
2099    session: Session,
2100) -> Result<()> {
2101    let responder_id = session.user_id;
2102    let requester_id = UserId::from_proto(request.requester_id);
2103    let db = session.db().await;
2104    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2105        db.dismiss_contact_notification(responder_id, requester_id)
2106            .await?;
2107    } else {
2108        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2109
2110        db.respond_to_contact_request(responder_id, requester_id, accept)
2111            .await?;
2112        let requester_busy = db.is_user_busy(requester_id).await?;
2113        let responder_busy = db.is_user_busy(responder_id).await?;
2114
2115        let pool = session.connection_pool().await;
2116        // Update responder with new contact
2117        let mut update = proto::UpdateContacts::default();
2118        if accept {
2119            update
2120                .contacts
2121                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2122        }
2123        update
2124            .remove_incoming_requests
2125            .push(requester_id.to_proto());
2126        for connection_id in pool.user_connection_ids(responder_id) {
2127            session.peer.send(connection_id, update.clone())?;
2128        }
2129
2130        // Update requester with new contact
2131        let mut update = proto::UpdateContacts::default();
2132        if accept {
2133            update
2134                .contacts
2135                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2136        }
2137        update
2138            .remove_outgoing_requests
2139            .push(responder_id.to_proto());
2140        for connection_id in pool.user_connection_ids(requester_id) {
2141            session.peer.send(connection_id, update.clone())?;
2142        }
2143    }
2144
2145    response.send(proto::Ack {})?;
2146    Ok(())
2147}
2148
2149async fn remove_contact(
2150    request: proto::RemoveContact,
2151    response: Response<proto::RemoveContact>,
2152    session: Session,
2153) -> Result<()> {
2154    let requester_id = session.user_id;
2155    let responder_id = UserId::from_proto(request.user_id);
2156    let db = session.db().await;
2157    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2158
2159    let pool = session.connection_pool().await;
2160    // Update outgoing contact requests of requester
2161    let mut update = proto::UpdateContacts::default();
2162    if contact_accepted {
2163        update.remove_contacts.push(responder_id.to_proto());
2164    } else {
2165        update
2166            .remove_outgoing_requests
2167            .push(responder_id.to_proto());
2168    }
2169    for connection_id in pool.user_connection_ids(requester_id) {
2170        session.peer.send(connection_id, update.clone())?;
2171    }
2172
2173    // Update incoming contact requests of responder
2174    let mut update = proto::UpdateContacts::default();
2175    if contact_accepted {
2176        update.remove_contacts.push(requester_id.to_proto());
2177    } else {
2178        update
2179            .remove_incoming_requests
2180            .push(requester_id.to_proto());
2181    }
2182    for connection_id in pool.user_connection_ids(responder_id) {
2183        session.peer.send(connection_id, update.clone())?;
2184    }
2185
2186    response.send(proto::Ack {})?;
2187    Ok(())
2188}
2189
2190async fn create_channel(
2191    request: proto::CreateChannel,
2192    response: Response<proto::CreateChannel>,
2193    session: Session,
2194) -> Result<()> {
2195    let db = session.db().await;
2196    let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2197
2198    if let Some(live_kit) = session.live_kit_client.as_ref() {
2199        live_kit.create_room(live_kit_room.clone()).await?;
2200    }
2201
2202    let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2203    let id = db
2204        .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2205        .await?;
2206
2207    let channel = proto::Channel {
2208        id: id.to_proto(),
2209        name: request.name,
2210    };
2211
2212    response.send(proto::CreateChannelResponse {
2213        channel: Some(channel.clone()),
2214        parent_id: request.parent_id,
2215    })?;
2216
2217    let Some(parent_id) = parent_id else {
2218        return Ok(());
2219    };
2220
2221    let update = proto::UpdateChannels {
2222        channels: vec![channel],
2223        insert_edge: vec![ChannelEdge {
2224            parent_id: parent_id.to_proto(),
2225            channel_id: id.to_proto(),
2226        }],
2227        ..Default::default()
2228    };
2229
2230    let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2231
2232    let connection_pool = session.connection_pool().await;
2233    for user_id in user_ids_to_notify {
2234        for connection_id in connection_pool.user_connection_ids(user_id) {
2235            if user_id == session.user_id {
2236                continue;
2237            }
2238            session.peer.send(connection_id, update.clone())?;
2239        }
2240    }
2241
2242    Ok(())
2243}
2244
2245async fn delete_channel(
2246    request: proto::DeleteChannel,
2247    response: Response<proto::DeleteChannel>,
2248    session: Session,
2249) -> Result<()> {
2250    let db = session.db().await;
2251
2252    let channel_id = request.channel_id;
2253    let (removed_channels, member_ids) = db
2254        .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2255        .await?;
2256    response.send(proto::Ack {})?;
2257
2258    // Notify members of removed channels
2259    let mut update = proto::UpdateChannels::default();
2260    update
2261        .delete_channels
2262        .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2263
2264    let connection_pool = session.connection_pool().await;
2265    for member_id in member_ids {
2266        for connection_id in connection_pool.user_connection_ids(member_id) {
2267            session.peer.send(connection_id, update.clone())?;
2268        }
2269    }
2270
2271    Ok(())
2272}
2273
2274async fn invite_channel_member(
2275    request: proto::InviteChannelMember,
2276    response: Response<proto::InviteChannelMember>,
2277    session: Session,
2278) -> Result<()> {
2279    let db = session.db().await;
2280    let channel_id = ChannelId::from_proto(request.channel_id);
2281    let invitee_id = UserId::from_proto(request.user_id);
2282    db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2283        .await?;
2284
2285    let (channel, _) = db
2286        .get_channel(channel_id, session.user_id)
2287        .await?
2288        .ok_or_else(|| anyhow!("channel not found"))?;
2289
2290    let mut update = proto::UpdateChannels::default();
2291    update.channel_invitations.push(proto::Channel {
2292        id: channel.id.to_proto(),
2293        name: channel.name,
2294    });
2295    for connection_id in session
2296        .connection_pool()
2297        .await
2298        .user_connection_ids(invitee_id)
2299    {
2300        session.peer.send(connection_id, update.clone())?;
2301    }
2302
2303    response.send(proto::Ack {})?;
2304    Ok(())
2305}
2306
2307async fn remove_channel_member(
2308    request: proto::RemoveChannelMember,
2309    response: Response<proto::RemoveChannelMember>,
2310    session: Session,
2311) -> Result<()> {
2312    let db = session.db().await;
2313    let channel_id = ChannelId::from_proto(request.channel_id);
2314    let member_id = UserId::from_proto(request.user_id);
2315
2316    db.remove_channel_member(channel_id, member_id, session.user_id)
2317        .await?;
2318
2319    let mut update = proto::UpdateChannels::default();
2320    update.delete_channels.push(channel_id.to_proto());
2321
2322    for connection_id in session
2323        .connection_pool()
2324        .await
2325        .user_connection_ids(member_id)
2326    {
2327        session.peer.send(connection_id, update.clone())?;
2328    }
2329
2330    response.send(proto::Ack {})?;
2331    Ok(())
2332}
2333
2334async fn set_channel_member_admin(
2335    request: proto::SetChannelMemberAdmin,
2336    response: Response<proto::SetChannelMemberAdmin>,
2337    session: Session,
2338) -> Result<()> {
2339    let db = session.db().await;
2340    let channel_id = ChannelId::from_proto(request.channel_id);
2341    let member_id = UserId::from_proto(request.user_id);
2342    db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2343        .await?;
2344
2345    let (channel, has_accepted) = db
2346        .get_channel(channel_id, member_id)
2347        .await?
2348        .ok_or_else(|| anyhow!("channel not found"))?;
2349
2350    let mut update = proto::UpdateChannels::default();
2351    if has_accepted {
2352        update.channel_permissions.push(proto::ChannelPermission {
2353            channel_id: channel.id.to_proto(),
2354            is_admin: request.admin,
2355        });
2356    }
2357
2358    for connection_id in session
2359        .connection_pool()
2360        .await
2361        .user_connection_ids(member_id)
2362    {
2363        session.peer.send(connection_id, update.clone())?;
2364    }
2365
2366    response.send(proto::Ack {})?;
2367    Ok(())
2368}
2369
2370async fn rename_channel(
2371    request: proto::RenameChannel,
2372    response: Response<proto::RenameChannel>,
2373    session: Session,
2374) -> Result<()> {
2375    let db = session.db().await;
2376    let channel_id = ChannelId::from_proto(request.channel_id);
2377    let new_name = db
2378        .rename_channel(channel_id, session.user_id, &request.name)
2379        .await?;
2380
2381    let channel = proto::Channel {
2382        id: request.channel_id,
2383        name: new_name,
2384    };
2385    response.send(proto::RenameChannelResponse {
2386        channel: Some(channel.clone()),
2387    })?;
2388    let mut update = proto::UpdateChannels::default();
2389    update.channels.push(channel);
2390
2391    let member_ids = db.get_channel_members(channel_id).await?;
2392
2393    let connection_pool = session.connection_pool().await;
2394    for member_id in member_ids {
2395        for connection_id in connection_pool.user_connection_ids(member_id) {
2396            session.peer.send(connection_id, update.clone())?;
2397        }
2398    }
2399
2400    Ok(())
2401}
2402
2403async fn link_channel(
2404    request: proto::LinkChannel,
2405    response: Response<proto::LinkChannel>,
2406    session: Session,
2407) -> Result<()> {
2408    let db = session.db().await;
2409    let channel_id = ChannelId::from_proto(request.channel_id);
2410    let to = ChannelId::from_proto(request.to);
2411    let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2412
2413    let members = db.get_channel_members(to).await?;
2414    let connection_pool = session.connection_pool().await;
2415    let update = proto::UpdateChannels {
2416        channels: channels_to_send
2417            .channels
2418            .into_iter()
2419            .map(|channel| proto::Channel {
2420                id: channel.id.to_proto(),
2421                name: channel.name,
2422            })
2423            .collect(),
2424        insert_edge: channels_to_send.edges,
2425        ..Default::default()
2426    };
2427    for member_id in members {
2428        for connection_id in connection_pool.user_connection_ids(member_id) {
2429            session.peer.send(connection_id, update.clone())?;
2430        }
2431    }
2432
2433    response.send(Ack {})?;
2434
2435    Ok(())
2436}
2437
2438async fn unlink_channel(
2439    request: proto::UnlinkChannel,
2440    response: Response<proto::UnlinkChannel>,
2441    session: Session,
2442) -> Result<()> {
2443    let db = session.db().await;
2444    let channel_id = ChannelId::from_proto(request.channel_id);
2445    let from = ChannelId::from_proto(request.from);
2446
2447    db.unlink_channel(session.user_id, channel_id, from).await?;
2448
2449    let members = db.get_channel_members(from).await?;
2450
2451    let update = proto::UpdateChannels {
2452        delete_edge: vec![proto::ChannelEdge {
2453            channel_id: channel_id.to_proto(),
2454            parent_id: from.to_proto(),
2455        }],
2456        ..Default::default()
2457    };
2458    let connection_pool = session.connection_pool().await;
2459    for member_id in members {
2460        for connection_id in connection_pool.user_connection_ids(member_id) {
2461            session.peer.send(connection_id, update.clone())?;
2462        }
2463    }
2464
2465    response.send(Ack {})?;
2466
2467    Ok(())
2468}
2469
2470async fn move_channel(
2471    request: proto::MoveChannel,
2472    response: Response<proto::MoveChannel>,
2473    session: Session,
2474) -> Result<()> {
2475    let db = session.db().await;
2476    let channel_id = ChannelId::from_proto(request.channel_id);
2477    let from_parent = ChannelId::from_proto(request.from);
2478    let to = ChannelId::from_proto(request.to);
2479
2480    let channels_to_send = db
2481        .move_channel(session.user_id, channel_id, from_parent, to)
2482        .await?;
2483
2484    if channels_to_send.is_empty() {
2485        response.send(Ack {})?;
2486        return Ok(());
2487    }
2488
2489    let members_from = db.get_channel_members(from_parent).await?;
2490    let members_to = db.get_channel_members(to).await?;
2491
2492    let update = proto::UpdateChannels {
2493        delete_edge: vec![proto::ChannelEdge {
2494            channel_id: channel_id.to_proto(),
2495            parent_id: from_parent.to_proto(),
2496        }],
2497        ..Default::default()
2498    };
2499    let connection_pool = session.connection_pool().await;
2500    for member_id in members_from {
2501        for connection_id in connection_pool.user_connection_ids(member_id) {
2502            session.peer.send(connection_id, update.clone())?;
2503        }
2504    }
2505
2506    let update = proto::UpdateChannels {
2507        channels: channels_to_send
2508            .channels
2509            .into_iter()
2510            .map(|channel| proto::Channel {
2511                id: channel.id.to_proto(),
2512                name: channel.name,
2513            })
2514            .collect(),
2515        insert_edge: channels_to_send.edges,
2516        ..Default::default()
2517    };
2518    for member_id in members_to {
2519        for connection_id in connection_pool.user_connection_ids(member_id) {
2520            session.peer.send(connection_id, update.clone())?;
2521        }
2522    }
2523
2524    response.send(Ack {})?;
2525
2526    Ok(())
2527}
2528
2529async fn get_channel_members(
2530    request: proto::GetChannelMembers,
2531    response: Response<proto::GetChannelMembers>,
2532    session: Session,
2533) -> Result<()> {
2534    let db = session.db().await;
2535    let channel_id = ChannelId::from_proto(request.channel_id);
2536    let members = db
2537        .get_channel_member_details(channel_id, session.user_id)
2538        .await?;
2539    response.send(proto::GetChannelMembersResponse { members })?;
2540    Ok(())
2541}
2542
2543async fn respond_to_channel_invite(
2544    request: proto::RespondToChannelInvite,
2545    response: Response<proto::RespondToChannelInvite>,
2546    session: Session,
2547) -> Result<()> {
2548    let db = session.db().await;
2549    let channel_id = ChannelId::from_proto(request.channel_id);
2550    db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2551        .await?;
2552
2553    let mut update = proto::UpdateChannels::default();
2554    update
2555        .remove_channel_invitations
2556        .push(channel_id.to_proto());
2557    if request.accept {
2558        let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2559        update
2560            .channels
2561            .extend(
2562                result
2563                    .channels
2564                    .channels
2565                    .into_iter()
2566                    .map(|channel| proto::Channel {
2567                        id: channel.id.to_proto(),
2568                        name: channel.name,
2569                    }),
2570            );
2571        update.insert_edge = result.channels.edges;
2572        update
2573            .channel_participants
2574            .extend(
2575                result
2576                    .channel_participants
2577                    .into_iter()
2578                    .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2579                        channel_id: channel_id.to_proto(),
2580                        participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2581                    }),
2582            );
2583        update
2584            .channel_permissions
2585            .extend(
2586                result
2587                    .channels_with_admin_privileges
2588                    .into_iter()
2589                    .map(|channel_id| proto::ChannelPermission {
2590                        channel_id: channel_id.to_proto(),
2591                        is_admin: true,
2592                    }),
2593            );
2594    }
2595    session.peer.send(session.connection_id, update)?;
2596    response.send(proto::Ack {})?;
2597
2598    Ok(())
2599}
2600
2601async fn join_channel(
2602    request: proto::JoinChannel,
2603    response: Response<proto::JoinChannel>,
2604    session: Session,
2605) -> Result<()> {
2606    let channel_id = ChannelId::from_proto(request.channel_id);
2607
2608    let joined_room = {
2609        leave_room_for_session(&session).await?;
2610        let db = session.db().await;
2611
2612        let room_id = db.room_id_for_channel(channel_id).await?;
2613
2614        let joined_room = db
2615            .join_room(room_id, session.user_id, session.connection_id)
2616            .await?;
2617
2618        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2619            let token = live_kit
2620                .room_token(
2621                    &joined_room.room.live_kit_room,
2622                    &session.user_id.to_string(),
2623                )
2624                .trace_err()?;
2625
2626            Some(LiveKitConnectionInfo {
2627                server_url: live_kit.url().into(),
2628                token,
2629            })
2630        });
2631
2632        response.send(proto::JoinRoomResponse {
2633            room: Some(joined_room.room.clone()),
2634            channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2635            live_kit_connection_info,
2636        })?;
2637
2638        room_updated(&joined_room.room, &session.peer);
2639
2640        joined_room.into_inner()
2641    };
2642
2643    channel_updated(
2644        channel_id,
2645        &joined_room.room,
2646        &joined_room.channel_members,
2647        &session.peer,
2648        &*session.connection_pool().await,
2649    );
2650
2651    update_user_contacts(session.user_id, &session).await?;
2652
2653    Ok(())
2654}
2655
2656async fn join_channel_buffer(
2657    request: proto::JoinChannelBuffer,
2658    response: Response<proto::JoinChannelBuffer>,
2659    session: Session,
2660) -> Result<()> {
2661    let db = session.db().await;
2662    let channel_id = ChannelId::from_proto(request.channel_id);
2663
2664    let open_response = db
2665        .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2666        .await?;
2667
2668    let collaborators = open_response.collaborators.clone();
2669    response.send(open_response)?;
2670
2671    let update = UpdateChannelBufferCollaborators {
2672        channel_id: channel_id.to_proto(),
2673        collaborators: collaborators.clone(),
2674    };
2675    channel_buffer_updated(
2676        session.connection_id,
2677        collaborators
2678            .iter()
2679            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2680        &update,
2681        &session.peer,
2682    );
2683
2684    Ok(())
2685}
2686
2687async fn update_channel_buffer(
2688    request: proto::UpdateChannelBuffer,
2689    session: Session,
2690) -> Result<()> {
2691    let db = session.db().await;
2692    let channel_id = ChannelId::from_proto(request.channel_id);
2693
2694    let collaborators = db
2695        .update_channel_buffer(channel_id, session.user_id, &request.operations)
2696        .await?;
2697
2698    channel_buffer_updated(
2699        session.connection_id,
2700        collaborators,
2701        &proto::UpdateChannelBuffer {
2702            channel_id: channel_id.to_proto(),
2703            operations: request.operations,
2704        },
2705        &session.peer,
2706    );
2707    Ok(())
2708}
2709
2710async fn rejoin_channel_buffers(
2711    request: proto::RejoinChannelBuffers,
2712    response: Response<proto::RejoinChannelBuffers>,
2713    session: Session,
2714) -> Result<()> {
2715    let db = session.db().await;
2716    let buffers = db
2717        .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2718        .await?;
2719
2720    for rejoined_buffer in &buffers {
2721        let collaborators_to_notify = rejoined_buffer
2722            .buffer
2723            .collaborators
2724            .iter()
2725            .filter_map(|c| Some(c.peer_id?.into()));
2726        channel_buffer_updated(
2727            session.connection_id,
2728            collaborators_to_notify,
2729            &proto::UpdateChannelBufferCollaborators {
2730                channel_id: rejoined_buffer.buffer.channel_id,
2731                collaborators: rejoined_buffer.buffer.collaborators.clone(),
2732            },
2733            &session.peer,
2734        );
2735    }
2736
2737    response.send(proto::RejoinChannelBuffersResponse {
2738        buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2739    })?;
2740
2741    Ok(())
2742}
2743
2744async fn leave_channel_buffer(
2745    request: proto::LeaveChannelBuffer,
2746    response: Response<proto::LeaveChannelBuffer>,
2747    session: Session,
2748) -> Result<()> {
2749    let db = session.db().await;
2750    let channel_id = ChannelId::from_proto(request.channel_id);
2751
2752    let left_buffer = db
2753        .leave_channel_buffer(channel_id, session.connection_id)
2754        .await?;
2755
2756    response.send(Ack {})?;
2757
2758    channel_buffer_updated(
2759        session.connection_id,
2760        left_buffer.connections,
2761        &proto::UpdateChannelBufferCollaborators {
2762            channel_id: channel_id.to_proto(),
2763            collaborators: left_buffer.collaborators,
2764        },
2765        &session.peer,
2766    );
2767
2768    Ok(())
2769}
2770
2771fn channel_buffer_updated<T: EnvelopedMessage>(
2772    sender_id: ConnectionId,
2773    collaborators: impl IntoIterator<Item = ConnectionId>,
2774    message: &T,
2775    peer: &Peer,
2776) {
2777    broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2778        peer.send(peer_id.into(), message.clone())
2779    });
2780}
2781
2782async fn send_channel_message(
2783    request: proto::SendChannelMessage,
2784    response: Response<proto::SendChannelMessage>,
2785    session: Session,
2786) -> Result<()> {
2787    // Validate the message body.
2788    let body = request.body.trim().to_string();
2789    if body.len() > MAX_MESSAGE_LEN {
2790        return Err(anyhow!("message is too long"))?;
2791    }
2792    if body.is_empty() {
2793        return Err(anyhow!("message can't be blank"))?;
2794    }
2795
2796    let timestamp = OffsetDateTime::now_utc();
2797    let nonce = request
2798        .nonce
2799        .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2800
2801    let channel_id = ChannelId::from_proto(request.channel_id);
2802    let (message_id, connection_ids) = session
2803        .db()
2804        .await
2805        .create_channel_message(
2806            channel_id,
2807            session.user_id,
2808            &body,
2809            timestamp,
2810            nonce.clone().into(),
2811        )
2812        .await?;
2813    let message = proto::ChannelMessage {
2814        sender_id: session.user_id.to_proto(),
2815        id: message_id.to_proto(),
2816        body,
2817        timestamp: timestamp.unix_timestamp() as u64,
2818        nonce: Some(nonce),
2819    };
2820    broadcast(Some(session.connection_id), connection_ids, |connection| {
2821        session.peer.send(
2822            connection,
2823            proto::ChannelMessageSent {
2824                channel_id: channel_id.to_proto(),
2825                message: Some(message.clone()),
2826            },
2827        )
2828    });
2829    response.send(proto::SendChannelMessageResponse {
2830        message: Some(message),
2831    })?;
2832    Ok(())
2833}
2834
2835async fn remove_channel_message(
2836    request: proto::RemoveChannelMessage,
2837    response: Response<proto::RemoveChannelMessage>,
2838    session: Session,
2839) -> Result<()> {
2840    let channel_id = ChannelId::from_proto(request.channel_id);
2841    let message_id = MessageId::from_proto(request.message_id);
2842    let connection_ids = session
2843        .db()
2844        .await
2845        .remove_channel_message(channel_id, message_id, session.user_id)
2846        .await?;
2847    broadcast(Some(session.connection_id), connection_ids, |connection| {
2848        session.peer.send(connection, request.clone())
2849    });
2850    response.send(proto::Ack {})?;
2851    Ok(())
2852}
2853
2854async fn join_channel_chat(
2855    request: proto::JoinChannelChat,
2856    response: Response<proto::JoinChannelChat>,
2857    session: Session,
2858) -> Result<()> {
2859    let channel_id = ChannelId::from_proto(request.channel_id);
2860
2861    let db = session.db().await;
2862    db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2863        .await?;
2864    let messages = db
2865        .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2866        .await?;
2867    response.send(proto::JoinChannelChatResponse {
2868        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2869        messages,
2870    })?;
2871    Ok(())
2872}
2873
2874async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2875    let channel_id = ChannelId::from_proto(request.channel_id);
2876    session
2877        .db()
2878        .await
2879        .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2880        .await?;
2881    Ok(())
2882}
2883
2884async fn get_channel_messages(
2885    request: proto::GetChannelMessages,
2886    response: Response<proto::GetChannelMessages>,
2887    session: Session,
2888) -> Result<()> {
2889    let channel_id = ChannelId::from_proto(request.channel_id);
2890    let messages = session
2891        .db()
2892        .await
2893        .get_channel_messages(
2894            channel_id,
2895            session.user_id,
2896            MESSAGE_COUNT_PER_PAGE,
2897            Some(MessageId::from_proto(request.before_message_id)),
2898        )
2899        .await?;
2900    response.send(proto::GetChannelMessagesResponse {
2901        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2902        messages,
2903    })?;
2904    Ok(())
2905}
2906
2907async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2908    let project_id = ProjectId::from_proto(request.project_id);
2909    let project_connection_ids = session
2910        .db()
2911        .await
2912        .project_connection_ids(project_id, session.connection_id)
2913        .await?;
2914    broadcast(
2915        Some(session.connection_id),
2916        project_connection_ids.iter().copied(),
2917        |connection_id| {
2918            session
2919                .peer
2920                .forward_send(session.connection_id, connection_id, request.clone())
2921        },
2922    );
2923    Ok(())
2924}
2925
2926async fn get_private_user_info(
2927    _request: proto::GetPrivateUserInfo,
2928    response: Response<proto::GetPrivateUserInfo>,
2929    session: Session,
2930) -> Result<()> {
2931    let db = session.db().await;
2932
2933    let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2934    let user = db
2935        .get_user_by_id(session.user_id)
2936        .await?
2937        .ok_or_else(|| anyhow!("user not found"))?;
2938    let flags = db.get_user_flags(session.user_id).await?;
2939
2940    response.send(proto::GetPrivateUserInfoResponse {
2941        metrics_id,
2942        staff: user.admin,
2943        flags,
2944    })?;
2945    Ok(())
2946}
2947
2948fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2949    match message {
2950        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2951        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2952        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2953        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2954        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2955            code: frame.code.into(),
2956            reason: frame.reason,
2957        })),
2958    }
2959}
2960
2961fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2962    match message {
2963        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2964        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2965        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2966        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2967        AxumMessage::Close(frame) => {
2968            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2969                code: frame.code.into(),
2970                reason: frame.reason,
2971            }))
2972        }
2973    }
2974}
2975
2976fn build_initial_channels_update(
2977    channels: ChannelsForUser,
2978    channel_invites: Vec<db::Channel>,
2979) -> proto::UpdateChannels {
2980    let mut update = proto::UpdateChannels::default();
2981
2982    for channel in channels.channels.channels {
2983        update.channels.push(proto::Channel {
2984            id: channel.id.to_proto(),
2985            name: channel.name,
2986        });
2987    }
2988
2989    update.insert_edge = channels.channels.edges;
2990
2991    for (channel_id, participants) in channels.channel_participants {
2992        update
2993            .channel_participants
2994            .push(proto::ChannelParticipants {
2995                channel_id: channel_id.to_proto(),
2996                participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2997            });
2998    }
2999
3000    update
3001        .channel_permissions
3002        .extend(
3003            channels
3004                .channels_with_admin_privileges
3005                .into_iter()
3006                .map(|id| proto::ChannelPermission {
3007                    channel_id: id.to_proto(),
3008                    is_admin: true,
3009                }),
3010        );
3011
3012    for channel in channel_invites {
3013        update.channel_invitations.push(proto::Channel {
3014            id: channel.id.to_proto(),
3015            name: channel.name,
3016        });
3017    }
3018
3019    update
3020}
3021
3022fn build_initial_contacts_update(
3023    contacts: Vec<db::Contact>,
3024    pool: &ConnectionPool,
3025) -> proto::UpdateContacts {
3026    let mut update = proto::UpdateContacts::default();
3027
3028    for contact in contacts {
3029        match contact {
3030            db::Contact::Accepted {
3031                user_id,
3032                should_notify,
3033                busy,
3034            } => {
3035                update
3036                    .contacts
3037                    .push(contact_for_user(user_id, should_notify, busy, &pool));
3038            }
3039            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3040            db::Contact::Incoming {
3041                user_id,
3042                should_notify,
3043            } => update
3044                .incoming_requests
3045                .push(proto::IncomingContactRequest {
3046                    requester_id: user_id.to_proto(),
3047                    should_notify,
3048                }),
3049        }
3050    }
3051
3052    update
3053}
3054
3055fn contact_for_user(
3056    user_id: UserId,
3057    should_notify: bool,
3058    busy: bool,
3059    pool: &ConnectionPool,
3060) -> proto::Contact {
3061    proto::Contact {
3062        user_id: user_id.to_proto(),
3063        online: pool.is_user_online(user_id),
3064        busy,
3065        should_notify,
3066    }
3067}
3068
3069fn room_updated(room: &proto::Room, peer: &Peer) {
3070    broadcast(
3071        None,
3072        room.participants
3073            .iter()
3074            .filter_map(|participant| Some(participant.peer_id?.into())),
3075        |peer_id| {
3076            peer.send(
3077                peer_id.into(),
3078                proto::RoomUpdated {
3079                    room: Some(room.clone()),
3080                },
3081            )
3082        },
3083    );
3084}
3085
3086fn channel_updated(
3087    channel_id: ChannelId,
3088    room: &proto::Room,
3089    channel_members: &[UserId],
3090    peer: &Peer,
3091    pool: &ConnectionPool,
3092) {
3093    let participants = room
3094        .participants
3095        .iter()
3096        .map(|p| p.user_id)
3097        .collect::<Vec<_>>();
3098
3099    broadcast(
3100        None,
3101        channel_members
3102            .iter()
3103            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3104        |peer_id| {
3105            peer.send(
3106                peer_id.into(),
3107                proto::UpdateChannels {
3108                    channel_participants: vec![proto::ChannelParticipants {
3109                        channel_id: channel_id.to_proto(),
3110                        participant_user_ids: participants.clone(),
3111                    }],
3112                    ..Default::default()
3113                },
3114            )
3115        },
3116    );
3117}
3118
3119async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3120    let db = session.db().await;
3121
3122    let contacts = db.get_contacts(user_id).await?;
3123    let busy = db.is_user_busy(user_id).await?;
3124
3125    let pool = session.connection_pool().await;
3126    let updated_contact = contact_for_user(user_id, false, busy, &pool);
3127    for contact in contacts {
3128        if let db::Contact::Accepted {
3129            user_id: contact_user_id,
3130            ..
3131        } = contact
3132        {
3133            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3134                session
3135                    .peer
3136                    .send(
3137                        contact_conn_id,
3138                        proto::UpdateContacts {
3139                            contacts: vec![updated_contact.clone()],
3140                            remove_contacts: Default::default(),
3141                            incoming_requests: Default::default(),
3142                            remove_incoming_requests: Default::default(),
3143                            outgoing_requests: Default::default(),
3144                            remove_outgoing_requests: Default::default(),
3145                        },
3146                    )
3147                    .trace_err();
3148            }
3149        }
3150    }
3151    Ok(())
3152}
3153
3154async fn leave_room_for_session(session: &Session) -> Result<()> {
3155    let mut contacts_to_update = HashSet::default();
3156
3157    let room_id;
3158    let canceled_calls_to_user_ids;
3159    let live_kit_room;
3160    let delete_live_kit_room;
3161    let room;
3162    let channel_members;
3163    let channel_id;
3164
3165    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3166        contacts_to_update.insert(session.user_id);
3167
3168        for project in left_room.left_projects.values() {
3169            project_left(project, session);
3170        }
3171
3172        room_id = RoomId::from_proto(left_room.room.id);
3173        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3174        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3175        delete_live_kit_room = left_room.deleted;
3176        room = mem::take(&mut left_room.room);
3177        channel_members = mem::take(&mut left_room.channel_members);
3178        channel_id = left_room.channel_id;
3179
3180        room_updated(&room, &session.peer);
3181    } else {
3182        return Ok(());
3183    }
3184
3185    if let Some(channel_id) = channel_id {
3186        channel_updated(
3187            channel_id,
3188            &room,
3189            &channel_members,
3190            &session.peer,
3191            &*session.connection_pool().await,
3192        );
3193    }
3194
3195    {
3196        let pool = session.connection_pool().await;
3197        for canceled_user_id in canceled_calls_to_user_ids {
3198            for connection_id in pool.user_connection_ids(canceled_user_id) {
3199                session
3200                    .peer
3201                    .send(
3202                        connection_id,
3203                        proto::CallCanceled {
3204                            room_id: room_id.to_proto(),
3205                        },
3206                    )
3207                    .trace_err();
3208            }
3209            contacts_to_update.insert(canceled_user_id);
3210        }
3211    }
3212
3213    for contact_user_id in contacts_to_update {
3214        update_user_contacts(contact_user_id, &session).await?;
3215    }
3216
3217    if let Some(live_kit) = session.live_kit_client.as_ref() {
3218        live_kit
3219            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3220            .await
3221            .trace_err();
3222
3223        if delete_live_kit_room {
3224            live_kit.delete_room(live_kit_room).await.trace_err();
3225        }
3226    }
3227
3228    Ok(())
3229}
3230
3231async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3232    let left_channel_buffers = session
3233        .db()
3234        .await
3235        .leave_channel_buffers(session.connection_id)
3236        .await?;
3237
3238    for left_buffer in left_channel_buffers {
3239        channel_buffer_updated(
3240            session.connection_id,
3241            left_buffer.connections,
3242            &proto::UpdateChannelBufferCollaborators {
3243                channel_id: left_buffer.channel_id.to_proto(),
3244                collaborators: left_buffer.collaborators,
3245            },
3246            &session.peer,
3247        );
3248    }
3249
3250    Ok(())
3251}
3252
3253fn project_left(project: &db::LeftProject, session: &Session) {
3254    for connection_id in &project.connection_ids {
3255        if project.host_user_id == session.user_id {
3256            session
3257                .peer
3258                .send(
3259                    *connection_id,
3260                    proto::UnshareProject {
3261                        project_id: project.id.to_proto(),
3262                    },
3263                )
3264                .trace_err();
3265        } else {
3266            session
3267                .peer
3268                .send(
3269                    *connection_id,
3270                    proto::RemoveProjectCollaborator {
3271                        project_id: project.id.to_proto(),
3272                        peer_id: Some(session.connection_id.into()),
3273                    },
3274                )
3275                .trace_err();
3276        }
3277    }
3278}
3279
3280pub trait ResultExt {
3281    type Ok;
3282
3283    fn trace_err(self) -> Option<Self::Ok>;
3284}
3285
3286impl<T, E> ResultExt for Result<T, E>
3287where
3288    E: std::fmt::Debug,
3289{
3290    type Ok = T;
3291
3292    fn trace_err(self) -> Option<T> {
3293        match self {
3294            Ok(value) => Some(value),
3295            Err(error) => {
3296                tracing::error!("{:?}", error);
3297                None
3298            }
3299        }
3300    }
3301}