rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth,
   5    db::{
   6        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
   7        UserId,
   8    },
   9    executor::Executor,
  10    AppState, Result,
  11};
  12use anyhow::anyhow;
  13use async_tungstenite::tungstenite::{
  14    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  15};
  16use axum::{
  17    body::Body,
  18    extract::{
  19        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  20        ConnectInfo, WebSocketUpgrade,
  21    },
  22    headers::{Header, HeaderName},
  23    http::StatusCode,
  24    middleware,
  25    response::IntoResponse,
  26    routing::get,
  27    Extension, Router, TypedHeader,
  28};
  29use collections::{HashMap, HashSet};
  30pub use connection_pool::ConnectionPool;
  31use futures::{
  32    channel::oneshot,
  33    future::{self, BoxFuture},
  34    stream::FuturesUnordered,
  35    FutureExt, SinkExt, StreamExt, TryStreamExt,
  36};
  37use lazy_static::lazy_static;
  38use prometheus::{register_int_gauge, IntGauge};
  39use rpc::{
  40    proto::{
  41        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, ChannelEdge, EntityMessage,
  42        EnvelopedMessage, LiveKitConnectionInfo, RequestMessage,
  43    },
  44    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  45};
  46use serde::{Serialize, Serializer};
  47use std::{
  48    any::TypeId,
  49    fmt,
  50    future::Future,
  51    marker::PhantomData,
  52    mem,
  53    net::SocketAddr,
  54    ops::{Deref, DerefMut},
  55    rc::Rc,
  56    sync::{
  57        atomic::{AtomicBool, Ordering::SeqCst},
  58        Arc,
  59    },
  60    time::{Duration, Instant},
  61};
  62use time::OffsetDateTime;
  63use tokio::sync::{watch, Semaphore};
  64use tower::ServiceBuilder;
  65use tracing::{info_span, instrument, Instrument};
  66
  67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
  69
  70const MESSAGE_COUNT_PER_PAGE: usize = 100;
  71const MAX_MESSAGE_LEN: usize = 1024;
  72
  73lazy_static! {
  74    static ref METRIC_CONNECTIONS: IntGauge =
  75        register_int_gauge!("connections", "number of connections").unwrap();
  76    static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
  77        "shared_projects",
  78        "number of open projects with one or more guests"
  79    )
  80    .unwrap();
  81}
  82
  83type MessageHandler =
  84    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  85
  86struct Response<R> {
  87    peer: Arc<Peer>,
  88    receipt: Receipt<R>,
  89    responded: Arc<AtomicBool>,
  90}
  91
  92impl<R: RequestMessage> Response<R> {
  93    fn send(self, payload: R::Response) -> Result<()> {
  94        self.responded.store(true, SeqCst);
  95        self.peer.respond(self.receipt, payload)?;
  96        Ok(())
  97    }
  98}
  99
 100#[derive(Clone)]
 101struct Session {
 102    user_id: UserId,
 103    connection_id: ConnectionId,
 104    db: Arc<tokio::sync::Mutex<DbHandle>>,
 105    peer: Arc<Peer>,
 106    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 107    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 108    executor: Executor,
 109}
 110
 111impl Session {
 112    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 113        #[cfg(test)]
 114        tokio::task::yield_now().await;
 115        let guard = self.db.lock().await;
 116        #[cfg(test)]
 117        tokio::task::yield_now().await;
 118        guard
 119    }
 120
 121    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 122        #[cfg(test)]
 123        tokio::task::yield_now().await;
 124        let guard = self.connection_pool.lock();
 125        ConnectionPoolGuard {
 126            guard,
 127            _not_send: PhantomData,
 128        }
 129    }
 130}
 131
 132impl fmt::Debug for Session {
 133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 134        f.debug_struct("Session")
 135            .field("user_id", &self.user_id)
 136            .field("connection_id", &self.connection_id)
 137            .finish()
 138    }
 139}
 140
 141struct DbHandle(Arc<Database>);
 142
 143impl Deref for DbHandle {
 144    type Target = Database;
 145
 146    fn deref(&self) -> &Self::Target {
 147        self.0.as_ref()
 148    }
 149}
 150
 151pub struct Server {
 152    id: parking_lot::Mutex<ServerId>,
 153    peer: Arc<Peer>,
 154    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 155    app_state: Arc<AppState>,
 156    executor: Executor,
 157    handlers: HashMap<TypeId, MessageHandler>,
 158    teardown: watch::Sender<()>,
 159}
 160
 161pub(crate) struct ConnectionPoolGuard<'a> {
 162    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 163    _not_send: PhantomData<Rc<()>>,
 164}
 165
 166#[derive(Serialize)]
 167pub struct ServerSnapshot<'a> {
 168    peer: &'a Peer,
 169    #[serde(serialize_with = "serialize_deref")]
 170    connection_pool: ConnectionPoolGuard<'a>,
 171}
 172
 173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 174where
 175    S: Serializer,
 176    T: Deref<Target = U>,
 177    U: Serialize,
 178{
 179    Serialize::serialize(value.deref(), serializer)
 180}
 181
 182impl Server {
 183    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 184        let mut server = Self {
 185            id: parking_lot::Mutex::new(id),
 186            peer: Peer::new(id.0 as u32),
 187            app_state,
 188            executor,
 189            connection_pool: Default::default(),
 190            handlers: Default::default(),
 191            teardown: watch::channel(()).0,
 192        };
 193
 194        server
 195            .add_request_handler(ping)
 196            .add_request_handler(create_room)
 197            .add_request_handler(join_room)
 198            .add_request_handler(rejoin_room)
 199            .add_request_handler(leave_room)
 200            .add_request_handler(call)
 201            .add_request_handler(cancel_call)
 202            .add_message_handler(decline_call)
 203            .add_request_handler(update_participant_location)
 204            .add_request_handler(share_project)
 205            .add_message_handler(unshare_project)
 206            .add_request_handler(join_project)
 207            .add_message_handler(leave_project)
 208            .add_request_handler(update_project)
 209            .add_request_handler(update_worktree)
 210            .add_message_handler(start_language_server)
 211            .add_message_handler(update_language_server)
 212            .add_message_handler(update_diagnostic_summary)
 213            .add_message_handler(update_worktree_settings)
 214            .add_message_handler(refresh_inlay_hints)
 215            .add_request_handler(forward_project_request::<proto::GetHover>)
 216            .add_request_handler(forward_project_request::<proto::GetDefinition>)
 217            .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
 218            .add_request_handler(forward_project_request::<proto::GetReferences>)
 219            .add_request_handler(forward_project_request::<proto::SearchProject>)
 220            .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
 221            .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
 222            .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
 223            .add_request_handler(forward_project_request::<proto::OpenBufferById>)
 224            .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
 225            .add_request_handler(forward_project_request::<proto::GetCompletions>)
 226            .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
 227            .add_request_handler(forward_project_request::<proto::GetCodeActions>)
 228            .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
 229            .add_request_handler(forward_project_request::<proto::PrepareRename>)
 230            .add_request_handler(forward_project_request::<proto::PerformRename>)
 231            .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
 232            .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
 233            .add_request_handler(forward_project_request::<proto::FormatBuffers>)
 234            .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
 235            .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
 236            .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
 237            .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
 238            .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
 239            .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
 240            .add_request_handler(forward_project_request::<proto::InlayHints>)
 241            .add_message_handler(create_buffer_for_peer)
 242            .add_request_handler(update_buffer)
 243            .add_message_handler(update_buffer_file)
 244            .add_message_handler(buffer_reloaded)
 245            .add_message_handler(buffer_saved)
 246            .add_request_handler(forward_project_request::<proto::SaveBuffer>)
 247            .add_request_handler(get_users)
 248            .add_request_handler(fuzzy_search_users)
 249            .add_request_handler(request_contact)
 250            .add_request_handler(remove_contact)
 251            .add_request_handler(respond_to_contact_request)
 252            .add_request_handler(create_channel)
 253            .add_request_handler(delete_channel)
 254            .add_request_handler(invite_channel_member)
 255            .add_request_handler(remove_channel_member)
 256            .add_request_handler(set_channel_member_admin)
 257            .add_request_handler(rename_channel)
 258            .add_request_handler(join_channel_buffer)
 259            .add_request_handler(leave_channel_buffer)
 260            .add_message_handler(update_channel_buffer)
 261            .add_request_handler(rejoin_channel_buffers)
 262            .add_request_handler(get_channel_members)
 263            .add_request_handler(respond_to_channel_invite)
 264            .add_request_handler(join_channel)
 265            .add_request_handler(join_channel_chat)
 266            .add_message_handler(leave_channel_chat)
 267            .add_request_handler(send_channel_message)
 268            .add_request_handler(remove_channel_message)
 269            .add_request_handler(get_channel_messages)
 270            .add_request_handler(link_channel)
 271            .add_request_handler(unlink_channel)
 272            .add_request_handler(move_channel)
 273            .add_request_handler(follow)
 274            .add_message_handler(unfollow)
 275            .add_message_handler(update_followers)
 276            .add_message_handler(update_diff_base)
 277            .add_request_handler(get_private_user_info);
 278
 279        Arc::new(server)
 280    }
 281
 282    pub async fn start(&self) -> Result<()> {
 283        let server_id = *self.id.lock();
 284        let app_state = self.app_state.clone();
 285        let peer = self.peer.clone();
 286        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 287        let pool = self.connection_pool.clone();
 288        let live_kit_client = self.app_state.live_kit_client.clone();
 289
 290        let span = info_span!("start server");
 291        self.executor.spawn_detached(
 292            async move {
 293                tracing::info!("waiting for cleanup timeout");
 294                timeout.await;
 295                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 296                if let Some((room_ids, channel_ids)) = app_state
 297                    .db
 298                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
 299                    .await
 300                    .trace_err()
 301                {
 302                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 303                    tracing::info!(
 304                        stale_channel_buffer_count = channel_ids.len(),
 305                        "retrieved stale channel buffers"
 306                    );
 307
 308                    for channel_id in channel_ids {
 309                        if let Some(refreshed_channel_buffer) = app_state
 310                            .db
 311                            .clear_stale_channel_buffer_collaborators(channel_id, server_id)
 312                            .await
 313                            .trace_err()
 314                        {
 315                            for connection_id in refreshed_channel_buffer.connection_ids {
 316                                for message in &refreshed_channel_buffer.removed_collaborators {
 317                                    peer.send(connection_id, message.clone()).trace_err();
 318                                }
 319                            }
 320                        }
 321                    }
 322
 323                    for room_id in room_ids {
 324                        let mut contacts_to_update = HashSet::default();
 325                        let mut canceled_calls_to_user_ids = Vec::new();
 326                        let mut live_kit_room = String::new();
 327                        let mut delete_live_kit_room = false;
 328
 329                        if let Some(mut refreshed_room) = app_state
 330                            .db
 331                            .clear_stale_room_participants(room_id, server_id)
 332                            .await
 333                            .trace_err()
 334                        {
 335                            tracing::info!(
 336                                room_id = room_id.0,
 337                                new_participant_count = refreshed_room.room.participants.len(),
 338                                "refreshed room"
 339                            );
 340                            room_updated(&refreshed_room.room, &peer);
 341                            if let Some(channel_id) = refreshed_room.channel_id {
 342                                channel_updated(
 343                                    channel_id,
 344                                    &refreshed_room.room,
 345                                    &refreshed_room.channel_members,
 346                                    &peer,
 347                                    &*pool.lock(),
 348                                );
 349                            }
 350                            contacts_to_update
 351                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 352                            contacts_to_update
 353                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 354                            canceled_calls_to_user_ids =
 355                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 356                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 357                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 358                        }
 359
 360                        {
 361                            let pool = pool.lock();
 362                            for canceled_user_id in canceled_calls_to_user_ids {
 363                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 364                                    peer.send(
 365                                        connection_id,
 366                                        proto::CallCanceled {
 367                                            room_id: room_id.to_proto(),
 368                                        },
 369                                    )
 370                                    .trace_err();
 371                                }
 372                            }
 373                        }
 374
 375                        for user_id in contacts_to_update {
 376                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 377                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 378                            if let Some((busy, contacts)) = busy.zip(contacts) {
 379                                let pool = pool.lock();
 380                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
 381                                for contact in contacts {
 382                                    if let db::Contact::Accepted {
 383                                        user_id: contact_user_id,
 384                                        ..
 385                                    } = contact
 386                                    {
 387                                        for contact_conn_id in
 388                                            pool.user_connection_ids(contact_user_id)
 389                                        {
 390                                            peer.send(
 391                                                contact_conn_id,
 392                                                proto::UpdateContacts {
 393                                                    contacts: vec![updated_contact.clone()],
 394                                                    remove_contacts: Default::default(),
 395                                                    incoming_requests: Default::default(),
 396                                                    remove_incoming_requests: Default::default(),
 397                                                    outgoing_requests: Default::default(),
 398                                                    remove_outgoing_requests: Default::default(),
 399                                                },
 400                                            )
 401                                            .trace_err();
 402                                        }
 403                                    }
 404                                }
 405                            }
 406                        }
 407
 408                        if let Some(live_kit) = live_kit_client.as_ref() {
 409                            if delete_live_kit_room {
 410                                live_kit.delete_room(live_kit_room).await.trace_err();
 411                            }
 412                        }
 413                    }
 414                }
 415
 416                app_state
 417                    .db
 418                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 419                    .await
 420                    .trace_err();
 421            }
 422            .instrument(span),
 423        );
 424        Ok(())
 425    }
 426
 427    pub fn teardown(&self) {
 428        self.peer.teardown();
 429        self.connection_pool.lock().reset();
 430        let _ = self.teardown.send(());
 431    }
 432
 433    #[cfg(test)]
 434    pub fn reset(&self, id: ServerId) {
 435        self.teardown();
 436        *self.id.lock() = id;
 437        self.peer.reset(id.0 as u32);
 438    }
 439
 440    #[cfg(test)]
 441    pub fn id(&self) -> ServerId {
 442        *self.id.lock()
 443    }
 444
 445    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 446    where
 447        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 448        Fut: 'static + Send + Future<Output = Result<()>>,
 449        M: EnvelopedMessage,
 450    {
 451        let prev_handler = self.handlers.insert(
 452            TypeId::of::<M>(),
 453            Box::new(move |envelope, session| {
 454                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 455                let span = info_span!(
 456                    "handle message",
 457                    payload_type = envelope.payload_type_name()
 458                );
 459                span.in_scope(|| {
 460                    tracing::info!(
 461                        payload_type = envelope.payload_type_name(),
 462                        "message received"
 463                    );
 464                });
 465                let start_time = Instant::now();
 466                let future = (handler)(*envelope, session);
 467                async move {
 468                    let result = future.await;
 469                    let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 470                    match result {
 471                        Err(error) => {
 472                            tracing::error!(%error, ?duration_ms, "error handling message")
 473                        }
 474                        Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
 475                    }
 476                }
 477                .instrument(span)
 478                .boxed()
 479            }),
 480        );
 481        if prev_handler.is_some() {
 482            panic!("registered a handler for the same message twice");
 483        }
 484        self
 485    }
 486
 487    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 488    where
 489        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 490        Fut: 'static + Send + Future<Output = Result<()>>,
 491        M: EnvelopedMessage,
 492    {
 493        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 494        self
 495    }
 496
 497    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 498    where
 499        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 500        Fut: Send + Future<Output = Result<()>>,
 501        M: RequestMessage,
 502    {
 503        let handler = Arc::new(handler);
 504        self.add_handler(move |envelope, session| {
 505            let receipt = envelope.receipt();
 506            let handler = handler.clone();
 507            async move {
 508                let peer = session.peer.clone();
 509                let responded = Arc::new(AtomicBool::default());
 510                let response = Response {
 511                    peer: peer.clone(),
 512                    responded: responded.clone(),
 513                    receipt,
 514                };
 515                match (handler)(envelope.payload, response, session).await {
 516                    Ok(()) => {
 517                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 518                            Ok(())
 519                        } else {
 520                            Err(anyhow!("handler did not send a response"))?
 521                        }
 522                    }
 523                    Err(error) => {
 524                        peer.respond_with_error(
 525                            receipt,
 526                            proto::Error {
 527                                message: error.to_string(),
 528                            },
 529                        )?;
 530                        Err(error)
 531                    }
 532                }
 533            }
 534        })
 535    }
 536
 537    pub fn handle_connection(
 538        self: &Arc<Self>,
 539        connection: Connection,
 540        address: String,
 541        user: User,
 542        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 543        executor: Executor,
 544    ) -> impl Future<Output = Result<()>> {
 545        let this = self.clone();
 546        let user_id = user.id;
 547        let login = user.github_login;
 548        let span = info_span!("handle connection", %user_id, %login, %address);
 549        let mut teardown = self.teardown.subscribe();
 550        async move {
 551            let (connection_id, handle_io, mut incoming_rx) = this
 552                .peer
 553                .add_connection(connection, {
 554                    let executor = executor.clone();
 555                    move |duration| executor.sleep(duration)
 556                });
 557
 558            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 559            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
 560            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
 561
 562            if let Some(send_connection_id) = send_connection_id.take() {
 563                let _ = send_connection_id.send(connection_id);
 564            }
 565
 566            if !user.connected_once {
 567                this.peer.send(connection_id, proto::ShowContacts {})?;
 568                this.app_state.db.set_user_connected_once(user_id, true).await?;
 569            }
 570
 571            let (contacts, channels_for_user, channel_invites) = future::try_join3(
 572                this.app_state.db.get_contacts(user_id),
 573                this.app_state.db.get_channels_for_user(user_id),
 574                this.app_state.db.get_channel_invites_for_user(user_id)
 575            ).await?;
 576
 577            {
 578                let mut pool = this.connection_pool.lock();
 579                pool.add_connection(connection_id, user_id, user.admin);
 580                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
 581                this.peer.send(connection_id, build_initial_channels_update(
 582                    channels_for_user,
 583                    channel_invites
 584                ))?;
 585            }
 586
 587            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
 588                this.peer.send(connection_id, incoming_call)?;
 589            }
 590
 591            let session = Session {
 592                user_id,
 593                connection_id,
 594                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 595                peer: this.peer.clone(),
 596                connection_pool: this.connection_pool.clone(),
 597                live_kit_client: this.app_state.live_kit_client.clone(),
 598                executor: executor.clone(),
 599            };
 600            update_user_contacts(user_id, &session).await?;
 601
 602            let handle_io = handle_io.fuse();
 603            futures::pin_mut!(handle_io);
 604
 605            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 606            // This prevents deadlocks when e.g., client A performs a request to client B and
 607            // client B performs a request to client A. If both clients stop processing further
 608            // messages until their respective request completes, they won't have a chance to
 609            // respond to the other client's request and cause a deadlock.
 610            //
 611            // This arrangement ensures we will attempt to process earlier messages first, but fall
 612            // back to processing messages arrived later in the spirit of making progress.
 613            let mut foreground_message_handlers = FuturesUnordered::new();
 614            let concurrent_handlers = Arc::new(Semaphore::new(256));
 615            loop {
 616                let next_message = async {
 617                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 618                    let message = incoming_rx.next().await;
 619                    (permit, message)
 620                }.fuse();
 621                futures::pin_mut!(next_message);
 622                futures::select_biased! {
 623                    _ = teardown.changed().fuse() => return Ok(()),
 624                    result = handle_io => {
 625                        if let Err(error) = result {
 626                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 627                        }
 628                        break;
 629                    }
 630                    _ = foreground_message_handlers.next() => {}
 631                    next_message = next_message => {
 632                        let (permit, message) = next_message;
 633                        if let Some(message) = message {
 634                            let type_name = message.payload_type_name();
 635                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 636                            let span_enter = span.enter();
 637                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 638                                let is_background = message.is_background();
 639                                let handle_message = (handler)(message, session.clone());
 640                                drop(span_enter);
 641
 642                                let handle_message = async move {
 643                                    handle_message.await;
 644                                    drop(permit);
 645                                }.instrument(span);
 646                                if is_background {
 647                                    executor.spawn_detached(handle_message);
 648                                } else {
 649                                    foreground_message_handlers.push(handle_message);
 650                                }
 651                            } else {
 652                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 653                            }
 654                        } else {
 655                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 656                            break;
 657                        }
 658                    }
 659                }
 660            }
 661
 662            drop(foreground_message_handlers);
 663            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 664            if let Err(error) = connection_lost(session, teardown, executor).await {
 665                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 666            }
 667
 668            Ok(())
 669        }.instrument(span)
 670    }
 671
 672    pub async fn invite_code_redeemed(
 673        self: &Arc<Self>,
 674        inviter_id: UserId,
 675        invitee_id: UserId,
 676    ) -> Result<()> {
 677        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 678            if let Some(code) = &user.invite_code {
 679                let pool = self.connection_pool.lock();
 680                let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
 681                for connection_id in pool.user_connection_ids(inviter_id) {
 682                    self.peer.send(
 683                        connection_id,
 684                        proto::UpdateContacts {
 685                            contacts: vec![invitee_contact.clone()],
 686                            ..Default::default()
 687                        },
 688                    )?;
 689                    self.peer.send(
 690                        connection_id,
 691                        proto::UpdateInviteInfo {
 692                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 693                            count: user.invite_count as u32,
 694                        },
 695                    )?;
 696                }
 697            }
 698        }
 699        Ok(())
 700    }
 701
 702    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 703        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 704            if let Some(invite_code) = &user.invite_code {
 705                let pool = self.connection_pool.lock();
 706                for connection_id in pool.user_connection_ids(user_id) {
 707                    self.peer.send(
 708                        connection_id,
 709                        proto::UpdateInviteInfo {
 710                            url: format!(
 711                                "{}{}",
 712                                self.app_state.config.invite_link_prefix, invite_code
 713                            ),
 714                            count: user.invite_count as u32,
 715                        },
 716                    )?;
 717                }
 718            }
 719        }
 720        Ok(())
 721    }
 722
 723    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 724        ServerSnapshot {
 725            connection_pool: ConnectionPoolGuard {
 726                guard: self.connection_pool.lock(),
 727                _not_send: PhantomData,
 728            },
 729            peer: &self.peer,
 730        }
 731    }
 732}
 733
 734impl<'a> Deref for ConnectionPoolGuard<'a> {
 735    type Target = ConnectionPool;
 736
 737    fn deref(&self) -> &Self::Target {
 738        &*self.guard
 739    }
 740}
 741
 742impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 743    fn deref_mut(&mut self) -> &mut Self::Target {
 744        &mut *self.guard
 745    }
 746}
 747
 748impl<'a> Drop for ConnectionPoolGuard<'a> {
 749    fn drop(&mut self) {
 750        #[cfg(test)]
 751        self.check_invariants();
 752    }
 753}
 754
 755fn broadcast<F>(
 756    sender_id: Option<ConnectionId>,
 757    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 758    mut f: F,
 759) where
 760    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 761{
 762    for receiver_id in receiver_ids {
 763        if Some(receiver_id) != sender_id {
 764            if let Err(error) = f(receiver_id) {
 765                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 766            }
 767        }
 768    }
 769}
 770
 771lazy_static! {
 772    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
 773}
 774
 775pub struct ProtocolVersion(u32);
 776
 777impl Header for ProtocolVersion {
 778    fn name() -> &'static HeaderName {
 779        &ZED_PROTOCOL_VERSION
 780    }
 781
 782    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 783    where
 784        Self: Sized,
 785        I: Iterator<Item = &'i axum::http::HeaderValue>,
 786    {
 787        let version = values
 788            .next()
 789            .ok_or_else(axum::headers::Error::invalid)?
 790            .to_str()
 791            .map_err(|_| axum::headers::Error::invalid())?
 792            .parse()
 793            .map_err(|_| axum::headers::Error::invalid())?;
 794        Ok(Self(version))
 795    }
 796
 797    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 798        values.extend([self.0.to_string().parse().unwrap()]);
 799    }
 800}
 801
 802pub fn routes(server: Arc<Server>) -> Router<Body> {
 803    Router::new()
 804        .route("/rpc", get(handle_websocket_request))
 805        .layer(
 806            ServiceBuilder::new()
 807                .layer(Extension(server.app_state.clone()))
 808                .layer(middleware::from_fn(auth::validate_header)),
 809        )
 810        .route("/metrics", get(handle_metrics))
 811        .layer(Extension(server))
 812}
 813
 814pub async fn handle_websocket_request(
 815    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 816    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 817    Extension(server): Extension<Arc<Server>>,
 818    Extension(user): Extension<User>,
 819    ws: WebSocketUpgrade,
 820) -> axum::response::Response {
 821    if protocol_version != rpc::PROTOCOL_VERSION {
 822        return (
 823            StatusCode::UPGRADE_REQUIRED,
 824            "client must be upgraded".to_string(),
 825        )
 826            .into_response();
 827    }
 828    let socket_address = socket_address.to_string();
 829    ws.on_upgrade(move |socket| {
 830        use util::ResultExt;
 831        let socket = socket
 832            .map_ok(to_tungstenite_message)
 833            .err_into()
 834            .with(|message| async move { Ok(to_axum_message(message)) });
 835        let connection = Connection::new(Box::pin(socket));
 836        async move {
 837            server
 838                .handle_connection(connection, socket_address, user, None, Executor::Production)
 839                .await
 840                .log_err();
 841        }
 842    })
 843}
 844
 845pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 846    let connections = server
 847        .connection_pool
 848        .lock()
 849        .connections()
 850        .filter(|connection| !connection.admin)
 851        .count();
 852
 853    METRIC_CONNECTIONS.set(connections as _);
 854
 855    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 856    METRIC_SHARED_PROJECTS.set(shared_projects as _);
 857
 858    let encoder = prometheus::TextEncoder::new();
 859    let metric_families = prometheus::gather();
 860    let encoded_metrics = encoder
 861        .encode_to_string(&metric_families)
 862        .map_err(|err| anyhow!("{}", err))?;
 863    Ok(encoded_metrics)
 864}
 865
 866#[instrument(err, skip(executor))]
 867async fn connection_lost(
 868    session: Session,
 869    mut teardown: watch::Receiver<()>,
 870    executor: Executor,
 871) -> Result<()> {
 872    session.peer.disconnect(session.connection_id);
 873    session
 874        .connection_pool()
 875        .await
 876        .remove_connection(session.connection_id)?;
 877
 878    session
 879        .db()
 880        .await
 881        .connection_lost(session.connection_id)
 882        .await
 883        .trace_err();
 884
 885    futures::select_biased! {
 886        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 887            log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
 888            leave_room_for_session(&session).await.trace_err();
 889            leave_channel_buffers_for_session(&session)
 890                .await
 891                .trace_err();
 892
 893            if !session
 894                .connection_pool()
 895                .await
 896                .is_user_online(session.user_id)
 897            {
 898                let db = session.db().await;
 899                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
 900                    room_updated(&room, &session.peer);
 901                }
 902            }
 903
 904            update_user_contacts(session.user_id, &session).await?;
 905        }
 906        _ = teardown.changed().fuse() => {}
 907    }
 908
 909    Ok(())
 910}
 911
 912async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
 913    response.send(proto::Ack {})?;
 914    Ok(())
 915}
 916
 917async fn create_room(
 918    _request: proto::CreateRoom,
 919    response: Response<proto::CreateRoom>,
 920    session: Session,
 921) -> Result<()> {
 922    let live_kit_room = nanoid::nanoid!(30);
 923
 924    let live_kit_connection_info = {
 925        let live_kit_room = live_kit_room.clone();
 926        let live_kit = session.live_kit_client.as_ref();
 927
 928        util::async_iife!({
 929            let live_kit = live_kit?;
 930
 931            live_kit
 932                .create_room(live_kit_room.clone())
 933                .await
 934                .trace_err()?;
 935
 936            let token = live_kit
 937                .room_token(&live_kit_room, &session.user_id.to_string())
 938                .trace_err()?;
 939
 940            Some(proto::LiveKitConnectionInfo {
 941                server_url: live_kit.url().into(),
 942                token,
 943            })
 944        })
 945    }
 946    .await;
 947
 948    let room = session
 949        .db()
 950        .await
 951        .create_room(session.user_id, session.connection_id, &live_kit_room)
 952        .await?;
 953
 954    response.send(proto::CreateRoomResponse {
 955        room: Some(room.clone()),
 956        live_kit_connection_info,
 957    })?;
 958
 959    update_user_contacts(session.user_id, &session).await?;
 960    Ok(())
 961}
 962
 963async fn join_room(
 964    request: proto::JoinRoom,
 965    response: Response<proto::JoinRoom>,
 966    session: Session,
 967) -> Result<()> {
 968    let room_id = RoomId::from_proto(request.id);
 969    let joined_room = {
 970        let room = session
 971            .db()
 972            .await
 973            .join_room(room_id, session.user_id, session.connection_id)
 974            .await?;
 975        room_updated(&room.room, &session.peer);
 976        room.into_inner()
 977    };
 978
 979    if let Some(channel_id) = joined_room.channel_id {
 980        channel_updated(
 981            channel_id,
 982            &joined_room.room,
 983            &joined_room.channel_members,
 984            &session.peer,
 985            &*session.connection_pool().await,
 986        )
 987    }
 988
 989    for connection_id in session
 990        .connection_pool()
 991        .await
 992        .user_connection_ids(session.user_id)
 993    {
 994        session
 995            .peer
 996            .send(
 997                connection_id,
 998                proto::CallCanceled {
 999                    room_id: room_id.to_proto(),
1000                },
1001            )
1002            .trace_err();
1003    }
1004
1005    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1006        if let Some(token) = live_kit
1007            .room_token(
1008                &joined_room.room.live_kit_room,
1009                &session.user_id.to_string(),
1010            )
1011            .trace_err()
1012        {
1013            Some(proto::LiveKitConnectionInfo {
1014                server_url: live_kit.url().into(),
1015                token,
1016            })
1017        } else {
1018            None
1019        }
1020    } else {
1021        None
1022    };
1023
1024    response.send(proto::JoinRoomResponse {
1025        room: Some(joined_room.room),
1026        channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1027        live_kit_connection_info,
1028    })?;
1029
1030    update_user_contacts(session.user_id, &session).await?;
1031    Ok(())
1032}
1033
1034async fn rejoin_room(
1035    request: proto::RejoinRoom,
1036    response: Response<proto::RejoinRoom>,
1037    session: Session,
1038) -> Result<()> {
1039    let room;
1040    let channel_id;
1041    let channel_members;
1042    {
1043        let mut rejoined_room = session
1044            .db()
1045            .await
1046            .rejoin_room(request, session.user_id, session.connection_id)
1047            .await?;
1048
1049        response.send(proto::RejoinRoomResponse {
1050            room: Some(rejoined_room.room.clone()),
1051            reshared_projects: rejoined_room
1052                .reshared_projects
1053                .iter()
1054                .map(|project| proto::ResharedProject {
1055                    id: project.id.to_proto(),
1056                    collaborators: project
1057                        .collaborators
1058                        .iter()
1059                        .map(|collaborator| collaborator.to_proto())
1060                        .collect(),
1061                })
1062                .collect(),
1063            rejoined_projects: rejoined_room
1064                .rejoined_projects
1065                .iter()
1066                .map(|rejoined_project| proto::RejoinedProject {
1067                    id: rejoined_project.id.to_proto(),
1068                    worktrees: rejoined_project
1069                        .worktrees
1070                        .iter()
1071                        .map(|worktree| proto::WorktreeMetadata {
1072                            id: worktree.id,
1073                            root_name: worktree.root_name.clone(),
1074                            visible: worktree.visible,
1075                            abs_path: worktree.abs_path.clone(),
1076                        })
1077                        .collect(),
1078                    collaborators: rejoined_project
1079                        .collaborators
1080                        .iter()
1081                        .map(|collaborator| collaborator.to_proto())
1082                        .collect(),
1083                    language_servers: rejoined_project.language_servers.clone(),
1084                })
1085                .collect(),
1086        })?;
1087        room_updated(&rejoined_room.room, &session.peer);
1088
1089        for project in &rejoined_room.reshared_projects {
1090            for collaborator in &project.collaborators {
1091                session
1092                    .peer
1093                    .send(
1094                        collaborator.connection_id,
1095                        proto::UpdateProjectCollaborator {
1096                            project_id: project.id.to_proto(),
1097                            old_peer_id: Some(project.old_connection_id.into()),
1098                            new_peer_id: Some(session.connection_id.into()),
1099                        },
1100                    )
1101                    .trace_err();
1102            }
1103
1104            broadcast(
1105                Some(session.connection_id),
1106                project
1107                    .collaborators
1108                    .iter()
1109                    .map(|collaborator| collaborator.connection_id),
1110                |connection_id| {
1111                    session.peer.forward_send(
1112                        session.connection_id,
1113                        connection_id,
1114                        proto::UpdateProject {
1115                            project_id: project.id.to_proto(),
1116                            worktrees: project.worktrees.clone(),
1117                        },
1118                    )
1119                },
1120            );
1121        }
1122
1123        for project in &rejoined_room.rejoined_projects {
1124            for collaborator in &project.collaborators {
1125                session
1126                    .peer
1127                    .send(
1128                        collaborator.connection_id,
1129                        proto::UpdateProjectCollaborator {
1130                            project_id: project.id.to_proto(),
1131                            old_peer_id: Some(project.old_connection_id.into()),
1132                            new_peer_id: Some(session.connection_id.into()),
1133                        },
1134                    )
1135                    .trace_err();
1136            }
1137        }
1138
1139        for project in &mut rejoined_room.rejoined_projects {
1140            for worktree in mem::take(&mut project.worktrees) {
1141                #[cfg(any(test, feature = "test-support"))]
1142                const MAX_CHUNK_SIZE: usize = 2;
1143                #[cfg(not(any(test, feature = "test-support")))]
1144                const MAX_CHUNK_SIZE: usize = 256;
1145
1146                // Stream this worktree's entries.
1147                let message = proto::UpdateWorktree {
1148                    project_id: project.id.to_proto(),
1149                    worktree_id: worktree.id,
1150                    abs_path: worktree.abs_path.clone(),
1151                    root_name: worktree.root_name,
1152                    updated_entries: worktree.updated_entries,
1153                    removed_entries: worktree.removed_entries,
1154                    scan_id: worktree.scan_id,
1155                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1156                    updated_repositories: worktree.updated_repositories,
1157                    removed_repositories: worktree.removed_repositories,
1158                };
1159                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1160                    session.peer.send(session.connection_id, update.clone())?;
1161                }
1162
1163                // Stream this worktree's diagnostics.
1164                for summary in worktree.diagnostic_summaries {
1165                    session.peer.send(
1166                        session.connection_id,
1167                        proto::UpdateDiagnosticSummary {
1168                            project_id: project.id.to_proto(),
1169                            worktree_id: worktree.id,
1170                            summary: Some(summary),
1171                        },
1172                    )?;
1173                }
1174
1175                for settings_file in worktree.settings_files {
1176                    session.peer.send(
1177                        session.connection_id,
1178                        proto::UpdateWorktreeSettings {
1179                            project_id: project.id.to_proto(),
1180                            worktree_id: worktree.id,
1181                            path: settings_file.path,
1182                            content: Some(settings_file.content),
1183                        },
1184                    )?;
1185                }
1186            }
1187
1188            for language_server in &project.language_servers {
1189                session.peer.send(
1190                    session.connection_id,
1191                    proto::UpdateLanguageServer {
1192                        project_id: project.id.to_proto(),
1193                        language_server_id: language_server.id,
1194                        variant: Some(
1195                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1196                                proto::LspDiskBasedDiagnosticsUpdated {},
1197                            ),
1198                        ),
1199                    },
1200                )?;
1201            }
1202        }
1203
1204        let rejoined_room = rejoined_room.into_inner();
1205
1206        room = rejoined_room.room;
1207        channel_id = rejoined_room.channel_id;
1208        channel_members = rejoined_room.channel_members;
1209    }
1210
1211    if let Some(channel_id) = channel_id {
1212        channel_updated(
1213            channel_id,
1214            &room,
1215            &channel_members,
1216            &session.peer,
1217            &*session.connection_pool().await,
1218        );
1219    }
1220
1221    update_user_contacts(session.user_id, &session).await?;
1222    Ok(())
1223}
1224
1225async fn leave_room(
1226    _: proto::LeaveRoom,
1227    response: Response<proto::LeaveRoom>,
1228    session: Session,
1229) -> Result<()> {
1230    leave_room_for_session(&session).await?;
1231    response.send(proto::Ack {})?;
1232    Ok(())
1233}
1234
1235async fn call(
1236    request: proto::Call,
1237    response: Response<proto::Call>,
1238    session: Session,
1239) -> Result<()> {
1240    let room_id = RoomId::from_proto(request.room_id);
1241    let calling_user_id = session.user_id;
1242    let calling_connection_id = session.connection_id;
1243    let called_user_id = UserId::from_proto(request.called_user_id);
1244    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1245    if !session
1246        .db()
1247        .await
1248        .has_contact(calling_user_id, called_user_id)
1249        .await?
1250    {
1251        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1252    }
1253
1254    let incoming_call = {
1255        let (room, incoming_call) = &mut *session
1256            .db()
1257            .await
1258            .call(
1259                room_id,
1260                calling_user_id,
1261                calling_connection_id,
1262                called_user_id,
1263                initial_project_id,
1264            )
1265            .await?;
1266        room_updated(&room, &session.peer);
1267        mem::take(incoming_call)
1268    };
1269    update_user_contacts(called_user_id, &session).await?;
1270
1271    let mut calls = session
1272        .connection_pool()
1273        .await
1274        .user_connection_ids(called_user_id)
1275        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1276        .collect::<FuturesUnordered<_>>();
1277
1278    while let Some(call_response) = calls.next().await {
1279        match call_response.as_ref() {
1280            Ok(_) => {
1281                response.send(proto::Ack {})?;
1282                return Ok(());
1283            }
1284            Err(_) => {
1285                call_response.trace_err();
1286            }
1287        }
1288    }
1289
1290    {
1291        let room = session
1292            .db()
1293            .await
1294            .call_failed(room_id, called_user_id)
1295            .await?;
1296        room_updated(&room, &session.peer);
1297    }
1298    update_user_contacts(called_user_id, &session).await?;
1299
1300    Err(anyhow!("failed to ring user"))?
1301}
1302
1303async fn cancel_call(
1304    request: proto::CancelCall,
1305    response: Response<proto::CancelCall>,
1306    session: Session,
1307) -> Result<()> {
1308    let called_user_id = UserId::from_proto(request.called_user_id);
1309    let room_id = RoomId::from_proto(request.room_id);
1310    {
1311        let room = session
1312            .db()
1313            .await
1314            .cancel_call(room_id, session.connection_id, called_user_id)
1315            .await?;
1316        room_updated(&room, &session.peer);
1317    }
1318
1319    for connection_id in session
1320        .connection_pool()
1321        .await
1322        .user_connection_ids(called_user_id)
1323    {
1324        session
1325            .peer
1326            .send(
1327                connection_id,
1328                proto::CallCanceled {
1329                    room_id: room_id.to_proto(),
1330                },
1331            )
1332            .trace_err();
1333    }
1334    response.send(proto::Ack {})?;
1335
1336    update_user_contacts(called_user_id, &session).await?;
1337    Ok(())
1338}
1339
1340async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1341    let room_id = RoomId::from_proto(message.room_id);
1342    {
1343        let room = session
1344            .db()
1345            .await
1346            .decline_call(Some(room_id), session.user_id)
1347            .await?
1348            .ok_or_else(|| anyhow!("failed to decline call"))?;
1349        room_updated(&room, &session.peer);
1350    }
1351
1352    for connection_id in session
1353        .connection_pool()
1354        .await
1355        .user_connection_ids(session.user_id)
1356    {
1357        session
1358            .peer
1359            .send(
1360                connection_id,
1361                proto::CallCanceled {
1362                    room_id: room_id.to_proto(),
1363                },
1364            )
1365            .trace_err();
1366    }
1367    update_user_contacts(session.user_id, &session).await?;
1368    Ok(())
1369}
1370
1371async fn update_participant_location(
1372    request: proto::UpdateParticipantLocation,
1373    response: Response<proto::UpdateParticipantLocation>,
1374    session: Session,
1375) -> Result<()> {
1376    let room_id = RoomId::from_proto(request.room_id);
1377    let location = request
1378        .location
1379        .ok_or_else(|| anyhow!("invalid location"))?;
1380
1381    let db = session.db().await;
1382    let room = db
1383        .update_room_participant_location(room_id, session.connection_id, location)
1384        .await?;
1385
1386    room_updated(&room, &session.peer);
1387    response.send(proto::Ack {})?;
1388    Ok(())
1389}
1390
1391async fn share_project(
1392    request: proto::ShareProject,
1393    response: Response<proto::ShareProject>,
1394    session: Session,
1395) -> Result<()> {
1396    let (project_id, room) = &*session
1397        .db()
1398        .await
1399        .share_project(
1400            RoomId::from_proto(request.room_id),
1401            session.connection_id,
1402            &request.worktrees,
1403        )
1404        .await?;
1405    response.send(proto::ShareProjectResponse {
1406        project_id: project_id.to_proto(),
1407    })?;
1408    room_updated(&room, &session.peer);
1409
1410    Ok(())
1411}
1412
1413async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1414    let project_id = ProjectId::from_proto(message.project_id);
1415
1416    let (room, guest_connection_ids) = &*session
1417        .db()
1418        .await
1419        .unshare_project(project_id, session.connection_id)
1420        .await?;
1421
1422    broadcast(
1423        Some(session.connection_id),
1424        guest_connection_ids.iter().copied(),
1425        |conn_id| session.peer.send(conn_id, message.clone()),
1426    );
1427    room_updated(&room, &session.peer);
1428
1429    Ok(())
1430}
1431
1432async fn join_project(
1433    request: proto::JoinProject,
1434    response: Response<proto::JoinProject>,
1435    session: Session,
1436) -> Result<()> {
1437    let project_id = ProjectId::from_proto(request.project_id);
1438    let guest_user_id = session.user_id;
1439
1440    tracing::info!(%project_id, "join project");
1441
1442    let (project, replica_id) = &mut *session
1443        .db()
1444        .await
1445        .join_project(project_id, session.connection_id)
1446        .await?;
1447
1448    let collaborators = project
1449        .collaborators
1450        .iter()
1451        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1452        .map(|collaborator| collaborator.to_proto())
1453        .collect::<Vec<_>>();
1454
1455    let worktrees = project
1456        .worktrees
1457        .iter()
1458        .map(|(id, worktree)| proto::WorktreeMetadata {
1459            id: *id,
1460            root_name: worktree.root_name.clone(),
1461            visible: worktree.visible,
1462            abs_path: worktree.abs_path.clone(),
1463        })
1464        .collect::<Vec<_>>();
1465
1466    for collaborator in &collaborators {
1467        session
1468            .peer
1469            .send(
1470                collaborator.peer_id.unwrap().into(),
1471                proto::AddProjectCollaborator {
1472                    project_id: project_id.to_proto(),
1473                    collaborator: Some(proto::Collaborator {
1474                        peer_id: Some(session.connection_id.into()),
1475                        replica_id: replica_id.0 as u32,
1476                        user_id: guest_user_id.to_proto(),
1477                    }),
1478                },
1479            )
1480            .trace_err();
1481    }
1482
1483    // First, we send the metadata associated with each worktree.
1484    response.send(proto::JoinProjectResponse {
1485        worktrees: worktrees.clone(),
1486        replica_id: replica_id.0 as u32,
1487        collaborators: collaborators.clone(),
1488        language_servers: project.language_servers.clone(),
1489    })?;
1490
1491    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1492        #[cfg(any(test, feature = "test-support"))]
1493        const MAX_CHUNK_SIZE: usize = 2;
1494        #[cfg(not(any(test, feature = "test-support")))]
1495        const MAX_CHUNK_SIZE: usize = 256;
1496
1497        // Stream this worktree's entries.
1498        let message = proto::UpdateWorktree {
1499            project_id: project_id.to_proto(),
1500            worktree_id,
1501            abs_path: worktree.abs_path.clone(),
1502            root_name: worktree.root_name,
1503            updated_entries: worktree.entries,
1504            removed_entries: Default::default(),
1505            scan_id: worktree.scan_id,
1506            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1507            updated_repositories: worktree.repository_entries.into_values().collect(),
1508            removed_repositories: Default::default(),
1509        };
1510        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1511            session.peer.send(session.connection_id, update.clone())?;
1512        }
1513
1514        // Stream this worktree's diagnostics.
1515        for summary in worktree.diagnostic_summaries {
1516            session.peer.send(
1517                session.connection_id,
1518                proto::UpdateDiagnosticSummary {
1519                    project_id: project_id.to_proto(),
1520                    worktree_id: worktree.id,
1521                    summary: Some(summary),
1522                },
1523            )?;
1524        }
1525
1526        for settings_file in worktree.settings_files {
1527            session.peer.send(
1528                session.connection_id,
1529                proto::UpdateWorktreeSettings {
1530                    project_id: project_id.to_proto(),
1531                    worktree_id: worktree.id,
1532                    path: settings_file.path,
1533                    content: Some(settings_file.content),
1534                },
1535            )?;
1536        }
1537    }
1538
1539    for language_server in &project.language_servers {
1540        session.peer.send(
1541            session.connection_id,
1542            proto::UpdateLanguageServer {
1543                project_id: project_id.to_proto(),
1544                language_server_id: language_server.id,
1545                variant: Some(
1546                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1547                        proto::LspDiskBasedDiagnosticsUpdated {},
1548                    ),
1549                ),
1550            },
1551        )?;
1552    }
1553
1554    Ok(())
1555}
1556
1557async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1558    let sender_id = session.connection_id;
1559    let project_id = ProjectId::from_proto(request.project_id);
1560
1561    let (room, project) = &*session
1562        .db()
1563        .await
1564        .leave_project(project_id, sender_id)
1565        .await?;
1566    tracing::info!(
1567        %project_id,
1568        host_user_id = %project.host_user_id,
1569        host_connection_id = %project.host_connection_id,
1570        "leave project"
1571    );
1572
1573    project_left(&project, &session);
1574    room_updated(&room, &session.peer);
1575
1576    Ok(())
1577}
1578
1579async fn update_project(
1580    request: proto::UpdateProject,
1581    response: Response<proto::UpdateProject>,
1582    session: Session,
1583) -> Result<()> {
1584    let project_id = ProjectId::from_proto(request.project_id);
1585    let (room, guest_connection_ids) = &*session
1586        .db()
1587        .await
1588        .update_project(project_id, session.connection_id, &request.worktrees)
1589        .await?;
1590    broadcast(
1591        Some(session.connection_id),
1592        guest_connection_ids.iter().copied(),
1593        |connection_id| {
1594            session
1595                .peer
1596                .forward_send(session.connection_id, connection_id, request.clone())
1597        },
1598    );
1599    room_updated(&room, &session.peer);
1600    response.send(proto::Ack {})?;
1601
1602    Ok(())
1603}
1604
1605async fn update_worktree(
1606    request: proto::UpdateWorktree,
1607    response: Response<proto::UpdateWorktree>,
1608    session: Session,
1609) -> Result<()> {
1610    let guest_connection_ids = session
1611        .db()
1612        .await
1613        .update_worktree(&request, session.connection_id)
1614        .await?;
1615
1616    broadcast(
1617        Some(session.connection_id),
1618        guest_connection_ids.iter().copied(),
1619        |connection_id| {
1620            session
1621                .peer
1622                .forward_send(session.connection_id, connection_id, request.clone())
1623        },
1624    );
1625    response.send(proto::Ack {})?;
1626    Ok(())
1627}
1628
1629async fn update_diagnostic_summary(
1630    message: proto::UpdateDiagnosticSummary,
1631    session: Session,
1632) -> Result<()> {
1633    let guest_connection_ids = session
1634        .db()
1635        .await
1636        .update_diagnostic_summary(&message, session.connection_id)
1637        .await?;
1638
1639    broadcast(
1640        Some(session.connection_id),
1641        guest_connection_ids.iter().copied(),
1642        |connection_id| {
1643            session
1644                .peer
1645                .forward_send(session.connection_id, connection_id, message.clone())
1646        },
1647    );
1648
1649    Ok(())
1650}
1651
1652async fn update_worktree_settings(
1653    message: proto::UpdateWorktreeSettings,
1654    session: Session,
1655) -> Result<()> {
1656    let guest_connection_ids = session
1657        .db()
1658        .await
1659        .update_worktree_settings(&message, session.connection_id)
1660        .await?;
1661
1662    broadcast(
1663        Some(session.connection_id),
1664        guest_connection_ids.iter().copied(),
1665        |connection_id| {
1666            session
1667                .peer
1668                .forward_send(session.connection_id, connection_id, message.clone())
1669        },
1670    );
1671
1672    Ok(())
1673}
1674
1675async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1676    broadcast_project_message(request.project_id, request, session).await
1677}
1678
1679async fn start_language_server(
1680    request: proto::StartLanguageServer,
1681    session: Session,
1682) -> Result<()> {
1683    let guest_connection_ids = session
1684        .db()
1685        .await
1686        .start_language_server(&request, session.connection_id)
1687        .await?;
1688
1689    broadcast(
1690        Some(session.connection_id),
1691        guest_connection_ids.iter().copied(),
1692        |connection_id| {
1693            session
1694                .peer
1695                .forward_send(session.connection_id, connection_id, request.clone())
1696        },
1697    );
1698    Ok(())
1699}
1700
1701async fn update_language_server(
1702    request: proto::UpdateLanguageServer,
1703    session: Session,
1704) -> Result<()> {
1705    session.executor.record_backtrace();
1706    let project_id = ProjectId::from_proto(request.project_id);
1707    let project_connection_ids = session
1708        .db()
1709        .await
1710        .project_connection_ids(project_id, session.connection_id)
1711        .await?;
1712    broadcast(
1713        Some(session.connection_id),
1714        project_connection_ids.iter().copied(),
1715        |connection_id| {
1716            session
1717                .peer
1718                .forward_send(session.connection_id, connection_id, request.clone())
1719        },
1720    );
1721    Ok(())
1722}
1723
1724async fn forward_project_request<T>(
1725    request: T,
1726    response: Response<T>,
1727    session: Session,
1728) -> Result<()>
1729where
1730    T: EntityMessage + RequestMessage,
1731{
1732    session.executor.record_backtrace();
1733    let project_id = ProjectId::from_proto(request.remote_entity_id());
1734    let host_connection_id = {
1735        let collaborators = session
1736            .db()
1737            .await
1738            .project_collaborators(project_id, session.connection_id)
1739            .await?;
1740        collaborators
1741            .iter()
1742            .find(|collaborator| collaborator.is_host)
1743            .ok_or_else(|| anyhow!("host not found"))?
1744            .connection_id
1745    };
1746
1747    let payload = session
1748        .peer
1749        .forward_request(session.connection_id, host_connection_id, request)
1750        .await?;
1751
1752    response.send(payload)?;
1753    Ok(())
1754}
1755
1756async fn create_buffer_for_peer(
1757    request: proto::CreateBufferForPeer,
1758    session: Session,
1759) -> Result<()> {
1760    session.executor.record_backtrace();
1761    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1762    session
1763        .peer
1764        .forward_send(session.connection_id, peer_id.into(), request)?;
1765    Ok(())
1766}
1767
1768async fn update_buffer(
1769    request: proto::UpdateBuffer,
1770    response: Response<proto::UpdateBuffer>,
1771    session: Session,
1772) -> Result<()> {
1773    session.executor.record_backtrace();
1774    let project_id = ProjectId::from_proto(request.project_id);
1775    let mut guest_connection_ids;
1776    let mut host_connection_id = None;
1777    {
1778        let collaborators = session
1779            .db()
1780            .await
1781            .project_collaborators(project_id, session.connection_id)
1782            .await?;
1783        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1784        for collaborator in collaborators.iter() {
1785            if collaborator.is_host {
1786                host_connection_id = Some(collaborator.connection_id);
1787            } else {
1788                guest_connection_ids.push(collaborator.connection_id);
1789            }
1790        }
1791    }
1792    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1793
1794    session.executor.record_backtrace();
1795    broadcast(
1796        Some(session.connection_id),
1797        guest_connection_ids,
1798        |connection_id| {
1799            session
1800                .peer
1801                .forward_send(session.connection_id, connection_id, request.clone())
1802        },
1803    );
1804    if host_connection_id != session.connection_id {
1805        session
1806            .peer
1807            .forward_request(session.connection_id, host_connection_id, request.clone())
1808            .await?;
1809    }
1810
1811    response.send(proto::Ack {})?;
1812    Ok(())
1813}
1814
1815async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1816    let project_id = ProjectId::from_proto(request.project_id);
1817    let project_connection_ids = session
1818        .db()
1819        .await
1820        .project_connection_ids(project_id, session.connection_id)
1821        .await?;
1822
1823    broadcast(
1824        Some(session.connection_id),
1825        project_connection_ids.iter().copied(),
1826        |connection_id| {
1827            session
1828                .peer
1829                .forward_send(session.connection_id, connection_id, request.clone())
1830        },
1831    );
1832    Ok(())
1833}
1834
1835async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1836    let project_id = ProjectId::from_proto(request.project_id);
1837    let project_connection_ids = session
1838        .db()
1839        .await
1840        .project_connection_ids(project_id, session.connection_id)
1841        .await?;
1842    broadcast(
1843        Some(session.connection_id),
1844        project_connection_ids.iter().copied(),
1845        |connection_id| {
1846            session
1847                .peer
1848                .forward_send(session.connection_id, connection_id, request.clone())
1849        },
1850    );
1851    Ok(())
1852}
1853
1854async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1855    broadcast_project_message(request.project_id, request, session).await
1856}
1857
1858async fn broadcast_project_message<T: EnvelopedMessage>(
1859    project_id: u64,
1860    request: T,
1861    session: Session,
1862) -> Result<()> {
1863    let project_id = ProjectId::from_proto(project_id);
1864    let project_connection_ids = session
1865        .db()
1866        .await
1867        .project_connection_ids(project_id, session.connection_id)
1868        .await?;
1869    broadcast(
1870        Some(session.connection_id),
1871        project_connection_ids.iter().copied(),
1872        |connection_id| {
1873            session
1874                .peer
1875                .forward_send(session.connection_id, connection_id, request.clone())
1876        },
1877    );
1878    Ok(())
1879}
1880
1881async fn follow(
1882    request: proto::Follow,
1883    response: Response<proto::Follow>,
1884    session: Session,
1885) -> Result<()> {
1886    let project_id = ProjectId::from_proto(request.project_id);
1887    let leader_id = request
1888        .leader_id
1889        .ok_or_else(|| anyhow!("invalid leader id"))?
1890        .into();
1891    let follower_id = session.connection_id;
1892
1893    {
1894        let project_connection_ids = session
1895            .db()
1896            .await
1897            .project_connection_ids(project_id, session.connection_id)
1898            .await?;
1899
1900        if !project_connection_ids.contains(&leader_id) {
1901            Err(anyhow!("no such peer"))?;
1902        }
1903    }
1904
1905    let mut response_payload = session
1906        .peer
1907        .forward_request(session.connection_id, leader_id, request)
1908        .await?;
1909    response_payload
1910        .views
1911        .retain(|view| view.leader_id != Some(follower_id.into()));
1912    response.send(response_payload)?;
1913
1914    let room = session
1915        .db()
1916        .await
1917        .follow(project_id, leader_id, follower_id)
1918        .await?;
1919    room_updated(&room, &session.peer);
1920
1921    Ok(())
1922}
1923
1924async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1925    let project_id = ProjectId::from_proto(request.project_id);
1926    let leader_id = request
1927        .leader_id
1928        .ok_or_else(|| anyhow!("invalid leader id"))?
1929        .into();
1930    let follower_id = session.connection_id;
1931
1932    if !session
1933        .db()
1934        .await
1935        .project_connection_ids(project_id, session.connection_id)
1936        .await?
1937        .contains(&leader_id)
1938    {
1939        Err(anyhow!("no such peer"))?;
1940    }
1941
1942    session
1943        .peer
1944        .forward_send(session.connection_id, leader_id, request)?;
1945
1946    let room = session
1947        .db()
1948        .await
1949        .unfollow(project_id, leader_id, follower_id)
1950        .await?;
1951    room_updated(&room, &session.peer);
1952
1953    Ok(())
1954}
1955
1956async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1957    let project_id = ProjectId::from_proto(request.project_id);
1958    let project_connection_ids = session
1959        .db
1960        .lock()
1961        .await
1962        .project_connection_ids(project_id, session.connection_id)
1963        .await?;
1964
1965    let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1966        proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1967        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1968        proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1969    });
1970    for follower_peer_id in request.follower_ids.iter().copied() {
1971        let follower_connection_id = follower_peer_id.into();
1972        if project_connection_ids.contains(&follower_connection_id)
1973            && Some(follower_peer_id) != leader_id
1974        {
1975            session.peer.forward_send(
1976                session.connection_id,
1977                follower_connection_id,
1978                request.clone(),
1979            )?;
1980        }
1981    }
1982    Ok(())
1983}
1984
1985async fn get_users(
1986    request: proto::GetUsers,
1987    response: Response<proto::GetUsers>,
1988    session: Session,
1989) -> Result<()> {
1990    let user_ids = request
1991        .user_ids
1992        .into_iter()
1993        .map(UserId::from_proto)
1994        .collect();
1995    let users = session
1996        .db()
1997        .await
1998        .get_users_by_ids(user_ids)
1999        .await?
2000        .into_iter()
2001        .map(|user| proto::User {
2002            id: user.id.to_proto(),
2003            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2004            github_login: user.github_login,
2005        })
2006        .collect();
2007    response.send(proto::UsersResponse { users })?;
2008    Ok(())
2009}
2010
2011async fn fuzzy_search_users(
2012    request: proto::FuzzySearchUsers,
2013    response: Response<proto::FuzzySearchUsers>,
2014    session: Session,
2015) -> Result<()> {
2016    let query = request.query;
2017    let users = match query.len() {
2018        0 => vec![],
2019        1 | 2 => session
2020            .db()
2021            .await
2022            .get_user_by_github_login(&query)
2023            .await?
2024            .into_iter()
2025            .collect(),
2026        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2027    };
2028    let users = users
2029        .into_iter()
2030        .filter(|user| user.id != session.user_id)
2031        .map(|user| proto::User {
2032            id: user.id.to_proto(),
2033            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2034            github_login: user.github_login,
2035        })
2036        .collect();
2037    response.send(proto::UsersResponse { users })?;
2038    Ok(())
2039}
2040
2041async fn request_contact(
2042    request: proto::RequestContact,
2043    response: Response<proto::RequestContact>,
2044    session: Session,
2045) -> Result<()> {
2046    let requester_id = session.user_id;
2047    let responder_id = UserId::from_proto(request.responder_id);
2048    if requester_id == responder_id {
2049        return Err(anyhow!("cannot add yourself as a contact"))?;
2050    }
2051
2052    session
2053        .db()
2054        .await
2055        .send_contact_request(requester_id, responder_id)
2056        .await?;
2057
2058    // Update outgoing contact requests of requester
2059    let mut update = proto::UpdateContacts::default();
2060    update.outgoing_requests.push(responder_id.to_proto());
2061    for connection_id in session
2062        .connection_pool()
2063        .await
2064        .user_connection_ids(requester_id)
2065    {
2066        session.peer.send(connection_id, update.clone())?;
2067    }
2068
2069    // Update incoming contact requests of responder
2070    let mut update = proto::UpdateContacts::default();
2071    update
2072        .incoming_requests
2073        .push(proto::IncomingContactRequest {
2074            requester_id: requester_id.to_proto(),
2075            should_notify: true,
2076        });
2077    for connection_id in session
2078        .connection_pool()
2079        .await
2080        .user_connection_ids(responder_id)
2081    {
2082        session.peer.send(connection_id, update.clone())?;
2083    }
2084
2085    response.send(proto::Ack {})?;
2086    Ok(())
2087}
2088
2089async fn respond_to_contact_request(
2090    request: proto::RespondToContactRequest,
2091    response: Response<proto::RespondToContactRequest>,
2092    session: Session,
2093) -> Result<()> {
2094    let responder_id = session.user_id;
2095    let requester_id = UserId::from_proto(request.requester_id);
2096    let db = session.db().await;
2097    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2098        db.dismiss_contact_notification(responder_id, requester_id)
2099            .await?;
2100    } else {
2101        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2102
2103        db.respond_to_contact_request(responder_id, requester_id, accept)
2104            .await?;
2105        let requester_busy = db.is_user_busy(requester_id).await?;
2106        let responder_busy = db.is_user_busy(responder_id).await?;
2107
2108        let pool = session.connection_pool().await;
2109        // Update responder with new contact
2110        let mut update = proto::UpdateContacts::default();
2111        if accept {
2112            update
2113                .contacts
2114                .push(contact_for_user(requester_id, false, requester_busy, &pool));
2115        }
2116        update
2117            .remove_incoming_requests
2118            .push(requester_id.to_proto());
2119        for connection_id in pool.user_connection_ids(responder_id) {
2120            session.peer.send(connection_id, update.clone())?;
2121        }
2122
2123        // Update requester with new contact
2124        let mut update = proto::UpdateContacts::default();
2125        if accept {
2126            update
2127                .contacts
2128                .push(contact_for_user(responder_id, true, responder_busy, &pool));
2129        }
2130        update
2131            .remove_outgoing_requests
2132            .push(responder_id.to_proto());
2133        for connection_id in pool.user_connection_ids(requester_id) {
2134            session.peer.send(connection_id, update.clone())?;
2135        }
2136    }
2137
2138    response.send(proto::Ack {})?;
2139    Ok(())
2140}
2141
2142async fn remove_contact(
2143    request: proto::RemoveContact,
2144    response: Response<proto::RemoveContact>,
2145    session: Session,
2146) -> Result<()> {
2147    let requester_id = session.user_id;
2148    let responder_id = UserId::from_proto(request.user_id);
2149    let db = session.db().await;
2150    let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2151
2152    let pool = session.connection_pool().await;
2153    // Update outgoing contact requests of requester
2154    let mut update = proto::UpdateContacts::default();
2155    if contact_accepted {
2156        update.remove_contacts.push(responder_id.to_proto());
2157    } else {
2158        update
2159            .remove_outgoing_requests
2160            .push(responder_id.to_proto());
2161    }
2162    for connection_id in pool.user_connection_ids(requester_id) {
2163        session.peer.send(connection_id, update.clone())?;
2164    }
2165
2166    // Update incoming contact requests of responder
2167    let mut update = proto::UpdateContacts::default();
2168    if contact_accepted {
2169        update.remove_contacts.push(requester_id.to_proto());
2170    } else {
2171        update
2172            .remove_incoming_requests
2173            .push(requester_id.to_proto());
2174    }
2175    for connection_id in pool.user_connection_ids(responder_id) {
2176        session.peer.send(connection_id, update.clone())?;
2177    }
2178
2179    response.send(proto::Ack {})?;
2180    Ok(())
2181}
2182
2183async fn create_channel(
2184    request: proto::CreateChannel,
2185    response: Response<proto::CreateChannel>,
2186    session: Session,
2187) -> Result<()> {
2188    let db = session.db().await;
2189    let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2190
2191    if let Some(live_kit) = session.live_kit_client.as_ref() {
2192        live_kit.create_room(live_kit_room.clone()).await?;
2193    }
2194
2195    let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2196    let id = db
2197        .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2198        .await?;
2199
2200    let channel = proto::Channel {
2201        id: id.to_proto(),
2202        name: request.name,
2203    };
2204
2205    response.send(proto::CreateChannelResponse {
2206        channel: Some(channel.clone()),
2207        parent_id: request.parent_id,
2208    })?;
2209
2210    let Some(parent_id) = parent_id else {
2211        return Ok(());
2212    };
2213
2214    let update = proto::UpdateChannels {
2215        channels: vec![channel],
2216        insert_edge: vec![ChannelEdge {
2217            parent_id: parent_id.to_proto(),
2218            channel_id: id.to_proto(),
2219        }],
2220        ..Default::default()
2221    };
2222
2223    let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2224
2225    let connection_pool = session.connection_pool().await;
2226    for user_id in user_ids_to_notify {
2227        for connection_id in connection_pool.user_connection_ids(user_id) {
2228            if user_id == session.user_id {
2229                continue;
2230            }
2231            session.peer.send(connection_id, update.clone())?;
2232        }
2233    }
2234
2235    Ok(())
2236}
2237
2238async fn delete_channel(
2239    request: proto::DeleteChannel,
2240    response: Response<proto::DeleteChannel>,
2241    session: Session,
2242) -> Result<()> {
2243    let db = session.db().await;
2244
2245    let channel_id = request.channel_id;
2246    let (removed_channels, member_ids) = db
2247        .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2248        .await?;
2249    response.send(proto::Ack {})?;
2250
2251    // Notify members of removed channels
2252    let mut update = proto::UpdateChannels::default();
2253    update
2254        .delete_channels
2255        .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2256
2257    let connection_pool = session.connection_pool().await;
2258    for member_id in member_ids {
2259        for connection_id in connection_pool.user_connection_ids(member_id) {
2260            session.peer.send(connection_id, update.clone())?;
2261        }
2262    }
2263
2264    Ok(())
2265}
2266
2267async fn invite_channel_member(
2268    request: proto::InviteChannelMember,
2269    response: Response<proto::InviteChannelMember>,
2270    session: Session,
2271) -> Result<()> {
2272    let db = session.db().await;
2273    let channel_id = ChannelId::from_proto(request.channel_id);
2274    let invitee_id = UserId::from_proto(request.user_id);
2275    db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2276        .await?;
2277
2278    let (channel, _) = db
2279        .get_channel(channel_id, session.user_id)
2280        .await?
2281        .ok_or_else(|| anyhow!("channel not found"))?;
2282
2283    let mut update = proto::UpdateChannels::default();
2284    update.channel_invitations.push(proto::Channel {
2285        id: channel.id.to_proto(),
2286        name: channel.name,
2287    });
2288    for connection_id in session
2289        .connection_pool()
2290        .await
2291        .user_connection_ids(invitee_id)
2292    {
2293        session.peer.send(connection_id, update.clone())?;
2294    }
2295
2296    response.send(proto::Ack {})?;
2297    Ok(())
2298}
2299
2300async fn remove_channel_member(
2301    request: proto::RemoveChannelMember,
2302    response: Response<proto::RemoveChannelMember>,
2303    session: Session,
2304) -> Result<()> {
2305    let db = session.db().await;
2306    let channel_id = ChannelId::from_proto(request.channel_id);
2307    let member_id = UserId::from_proto(request.user_id);
2308
2309    db.remove_channel_member(channel_id, member_id, session.user_id)
2310        .await?;
2311
2312    let mut update = proto::UpdateChannels::default();
2313    update.delete_channels.push(channel_id.to_proto());
2314
2315    for connection_id in session
2316        .connection_pool()
2317        .await
2318        .user_connection_ids(member_id)
2319    {
2320        session.peer.send(connection_id, update.clone())?;
2321    }
2322
2323    response.send(proto::Ack {})?;
2324    Ok(())
2325}
2326
2327async fn set_channel_member_admin(
2328    request: proto::SetChannelMemberAdmin,
2329    response: Response<proto::SetChannelMemberAdmin>,
2330    session: Session,
2331) -> Result<()> {
2332    let db = session.db().await;
2333    let channel_id = ChannelId::from_proto(request.channel_id);
2334    let member_id = UserId::from_proto(request.user_id);
2335    db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2336        .await?;
2337
2338    let (channel, has_accepted) = db
2339        .get_channel(channel_id, member_id)
2340        .await?
2341        .ok_or_else(|| anyhow!("channel not found"))?;
2342
2343    let mut update = proto::UpdateChannels::default();
2344    if has_accepted {
2345        update.channel_permissions.push(proto::ChannelPermission {
2346            channel_id: channel.id.to_proto(),
2347            is_admin: request.admin,
2348        });
2349    }
2350
2351    for connection_id in session
2352        .connection_pool()
2353        .await
2354        .user_connection_ids(member_id)
2355    {
2356        session.peer.send(connection_id, update.clone())?;
2357    }
2358
2359    response.send(proto::Ack {})?;
2360    Ok(())
2361}
2362
2363async fn rename_channel(
2364    request: proto::RenameChannel,
2365    response: Response<proto::RenameChannel>,
2366    session: Session,
2367) -> Result<()> {
2368    let db = session.db().await;
2369    let channel_id = ChannelId::from_proto(request.channel_id);
2370    let new_name = db
2371        .rename_channel(channel_id, session.user_id, &request.name)
2372        .await?;
2373
2374    let channel = proto::Channel {
2375        id: request.channel_id,
2376        name: new_name,
2377    };
2378    response.send(proto::RenameChannelResponse {
2379        channel: Some(channel.clone()),
2380    })?;
2381    let mut update = proto::UpdateChannels::default();
2382    update.channels.push(channel);
2383
2384    let member_ids = db.get_channel_members(channel_id).await?;
2385
2386    let connection_pool = session.connection_pool().await;
2387    for member_id in member_ids {
2388        for connection_id in connection_pool.user_connection_ids(member_id) {
2389            session.peer.send(connection_id, update.clone())?;
2390        }
2391    }
2392
2393    Ok(())
2394}
2395
2396async fn link_channel(
2397    request: proto::LinkChannel,
2398    response: Response<proto::LinkChannel>,
2399    session: Session,
2400) -> Result<()> {
2401    let db = session.db().await;
2402    let channel_id = ChannelId::from_proto(request.channel_id);
2403    let to = ChannelId::from_proto(request.to);
2404    let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2405
2406    let members = db.get_channel_members(to).await?;
2407    let connection_pool = session.connection_pool().await;
2408    let update = proto::UpdateChannels {
2409        channels: channels_to_send
2410            .channels
2411            .into_iter()
2412            .map(|channel| proto::Channel {
2413                id: channel.id.to_proto(),
2414                name: channel.name,
2415            })
2416            .collect(),
2417        insert_edge: channels_to_send.edges,
2418        ..Default::default()
2419    };
2420    for member_id in members {
2421        for connection_id in connection_pool.user_connection_ids(member_id) {
2422            session.peer.send(connection_id, update.clone())?;
2423        }
2424    }
2425
2426    response.send(Ack {})?;
2427
2428    Ok(())
2429}
2430
2431async fn unlink_channel(
2432    request: proto::UnlinkChannel,
2433    response: Response<proto::UnlinkChannel>,
2434    session: Session,
2435) -> Result<()> {
2436    let db = session.db().await;
2437    let channel_id = ChannelId::from_proto(request.channel_id);
2438    let from = ChannelId::from_proto(request.from);
2439
2440    db.unlink_channel(session.user_id, channel_id, from).await?;
2441
2442    let members = db.get_channel_members(from).await?;
2443
2444    let update = proto::UpdateChannels {
2445        delete_edge: vec![proto::ChannelEdge {
2446            channel_id: channel_id.to_proto(),
2447            parent_id: from.to_proto(),
2448        }],
2449        ..Default::default()
2450    };
2451    let connection_pool = session.connection_pool().await;
2452    for member_id in members {
2453        for connection_id in connection_pool.user_connection_ids(member_id) {
2454            session.peer.send(connection_id, update.clone())?;
2455        }
2456    }
2457
2458    response.send(Ack {})?;
2459
2460    Ok(())
2461}
2462
2463async fn move_channel(
2464    request: proto::MoveChannel,
2465    response: Response<proto::MoveChannel>,
2466    session: Session,
2467) -> Result<()> {
2468    let db = session.db().await;
2469    let channel_id = ChannelId::from_proto(request.channel_id);
2470    let from_parent = ChannelId::from_proto(request.from);
2471    let to = ChannelId::from_proto(request.to);
2472
2473    let channels_to_send = db
2474        .move_channel(session.user_id, channel_id, from_parent, to)
2475        .await?;
2476
2477    if channels_to_send.is_empty() {
2478        response.send(Ack {})?;
2479        return Ok(());
2480    }
2481
2482    let members_from = db.get_channel_members(from_parent).await?;
2483    let members_to = db.get_channel_members(to).await?;
2484
2485    let update = proto::UpdateChannels {
2486        delete_edge: vec![proto::ChannelEdge {
2487            channel_id: channel_id.to_proto(),
2488            parent_id: from_parent.to_proto(),
2489        }],
2490        ..Default::default()
2491    };
2492    let connection_pool = session.connection_pool().await;
2493    for member_id in members_from {
2494        for connection_id in connection_pool.user_connection_ids(member_id) {
2495            session.peer.send(connection_id, update.clone())?;
2496        }
2497    }
2498
2499    let update = proto::UpdateChannels {
2500        channels: channels_to_send
2501            .channels
2502            .into_iter()
2503            .map(|channel| proto::Channel {
2504                id: channel.id.to_proto(),
2505                name: channel.name,
2506            })
2507            .collect(),
2508        insert_edge: channels_to_send.edges,
2509        ..Default::default()
2510    };
2511    for member_id in members_to {
2512        for connection_id in connection_pool.user_connection_ids(member_id) {
2513            session.peer.send(connection_id, update.clone())?;
2514        }
2515    }
2516
2517    response.send(Ack {})?;
2518
2519    Ok(())
2520}
2521
2522async fn get_channel_members(
2523    request: proto::GetChannelMembers,
2524    response: Response<proto::GetChannelMembers>,
2525    session: Session,
2526) -> Result<()> {
2527    let db = session.db().await;
2528    let channel_id = ChannelId::from_proto(request.channel_id);
2529    let members = db
2530        .get_channel_member_details(channel_id, session.user_id)
2531        .await?;
2532    response.send(proto::GetChannelMembersResponse { members })?;
2533    Ok(())
2534}
2535
2536async fn respond_to_channel_invite(
2537    request: proto::RespondToChannelInvite,
2538    response: Response<proto::RespondToChannelInvite>,
2539    session: Session,
2540) -> Result<()> {
2541    let db = session.db().await;
2542    let channel_id = ChannelId::from_proto(request.channel_id);
2543    db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2544        .await?;
2545
2546    let mut update = proto::UpdateChannels::default();
2547    update
2548        .remove_channel_invitations
2549        .push(channel_id.to_proto());
2550    if request.accept {
2551        let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2552        update
2553            .channels
2554            .extend(
2555                result
2556                    .channels
2557                    .channels
2558                    .into_iter()
2559                    .map(|channel| proto::Channel {
2560                        id: channel.id.to_proto(),
2561                        name: channel.name,
2562                    }),
2563            );
2564        update.insert_edge = result.channels.edges;
2565        update
2566            .channel_participants
2567            .extend(
2568                result
2569                    .channel_participants
2570                    .into_iter()
2571                    .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2572                        channel_id: channel_id.to_proto(),
2573                        participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2574                    }),
2575            );
2576        update
2577            .channel_permissions
2578            .extend(
2579                result
2580                    .channels_with_admin_privileges
2581                    .into_iter()
2582                    .map(|channel_id| proto::ChannelPermission {
2583                        channel_id: channel_id.to_proto(),
2584                        is_admin: true,
2585                    }),
2586            );
2587    }
2588    session.peer.send(session.connection_id, update)?;
2589    response.send(proto::Ack {})?;
2590
2591    Ok(())
2592}
2593
2594async fn join_channel(
2595    request: proto::JoinChannel,
2596    response: Response<proto::JoinChannel>,
2597    session: Session,
2598) -> Result<()> {
2599    let channel_id = ChannelId::from_proto(request.channel_id);
2600
2601    let joined_room = {
2602        leave_room_for_session(&session).await?;
2603        let db = session.db().await;
2604
2605        let room_id = db.room_id_for_channel(channel_id).await?;
2606
2607        let joined_room = db
2608            .join_room(room_id, session.user_id, session.connection_id)
2609            .await?;
2610
2611        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2612            let token = live_kit
2613                .room_token(
2614                    &joined_room.room.live_kit_room,
2615                    &session.user_id.to_string(),
2616                )
2617                .trace_err()?;
2618
2619            Some(LiveKitConnectionInfo {
2620                server_url: live_kit.url().into(),
2621                token,
2622            })
2623        });
2624
2625        response.send(proto::JoinRoomResponse {
2626            room: Some(joined_room.room.clone()),
2627            channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2628            live_kit_connection_info,
2629        })?;
2630
2631        room_updated(&joined_room.room, &session.peer);
2632
2633        joined_room.into_inner()
2634    };
2635
2636    channel_updated(
2637        channel_id,
2638        &joined_room.room,
2639        &joined_room.channel_members,
2640        &session.peer,
2641        &*session.connection_pool().await,
2642    );
2643
2644    update_user_contacts(session.user_id, &session).await?;
2645
2646    Ok(())
2647}
2648
2649async fn join_channel_buffer(
2650    request: proto::JoinChannelBuffer,
2651    response: Response<proto::JoinChannelBuffer>,
2652    session: Session,
2653) -> Result<()> {
2654    let db = session.db().await;
2655    let channel_id = ChannelId::from_proto(request.channel_id);
2656
2657    let open_response = db
2658        .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2659        .await?;
2660
2661    let replica_id = open_response.replica_id;
2662    let collaborators = open_response.collaborators.clone();
2663
2664    response.send(open_response)?;
2665
2666    let update = AddChannelBufferCollaborator {
2667        channel_id: channel_id.to_proto(),
2668        collaborator: Some(proto::Collaborator {
2669            user_id: session.user_id.to_proto(),
2670            peer_id: Some(session.connection_id.into()),
2671            replica_id,
2672        }),
2673    };
2674    channel_buffer_updated(
2675        session.connection_id,
2676        collaborators
2677            .iter()
2678            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2679        &update,
2680        &session.peer,
2681    );
2682
2683    Ok(())
2684}
2685
2686async fn update_channel_buffer(
2687    request: proto::UpdateChannelBuffer,
2688    session: Session,
2689) -> Result<()> {
2690    let db = session.db().await;
2691    let channel_id = ChannelId::from_proto(request.channel_id);
2692
2693    let collaborators = db
2694        .update_channel_buffer(channel_id, session.user_id, &request.operations)
2695        .await?;
2696
2697    channel_buffer_updated(
2698        session.connection_id,
2699        collaborators,
2700        &proto::UpdateChannelBuffer {
2701            channel_id: channel_id.to_proto(),
2702            operations: request.operations,
2703        },
2704        &session.peer,
2705    );
2706    Ok(())
2707}
2708
2709async fn rejoin_channel_buffers(
2710    request: proto::RejoinChannelBuffers,
2711    response: Response<proto::RejoinChannelBuffers>,
2712    session: Session,
2713) -> Result<()> {
2714    let db = session.db().await;
2715    let buffers = db
2716        .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2717        .await?;
2718
2719    for buffer in &buffers {
2720        let collaborators_to_notify = buffer
2721            .buffer
2722            .collaborators
2723            .iter()
2724            .filter_map(|c| Some(c.peer_id?.into()));
2725        channel_buffer_updated(
2726            session.connection_id,
2727            collaborators_to_notify,
2728            &proto::UpdateChannelBufferCollaborator {
2729                channel_id: buffer.buffer.channel_id,
2730                old_peer_id: Some(buffer.old_connection_id.into()),
2731                new_peer_id: Some(session.connection_id.into()),
2732            },
2733            &session.peer,
2734        );
2735    }
2736
2737    response.send(proto::RejoinChannelBuffersResponse {
2738        buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2739    })?;
2740
2741    Ok(())
2742}
2743
2744async fn leave_channel_buffer(
2745    request: proto::LeaveChannelBuffer,
2746    response: Response<proto::LeaveChannelBuffer>,
2747    session: Session,
2748) -> Result<()> {
2749    let db = session.db().await;
2750    let channel_id = ChannelId::from_proto(request.channel_id);
2751
2752    let collaborators_to_notify = db
2753        .leave_channel_buffer(channel_id, session.connection_id)
2754        .await?;
2755
2756    response.send(Ack {})?;
2757
2758    channel_buffer_updated(
2759        session.connection_id,
2760        collaborators_to_notify,
2761        &proto::RemoveChannelBufferCollaborator {
2762            channel_id: channel_id.to_proto(),
2763            peer_id: Some(session.connection_id.into()),
2764        },
2765        &session.peer,
2766    );
2767
2768    Ok(())
2769}
2770
2771fn channel_buffer_updated<T: EnvelopedMessage>(
2772    sender_id: ConnectionId,
2773    collaborators: impl IntoIterator<Item = ConnectionId>,
2774    message: &T,
2775    peer: &Peer,
2776) {
2777    broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2778        peer.send(peer_id.into(), message.clone())
2779    });
2780}
2781
2782async fn send_channel_message(
2783    request: proto::SendChannelMessage,
2784    response: Response<proto::SendChannelMessage>,
2785    session: Session,
2786) -> Result<()> {
2787    // Validate the message body.
2788    let body = request.body.trim().to_string();
2789    if body.len() > MAX_MESSAGE_LEN {
2790        return Err(anyhow!("message is too long"))?;
2791    }
2792    if body.is_empty() {
2793        return Err(anyhow!("message can't be blank"))?;
2794    }
2795
2796    let timestamp = OffsetDateTime::now_utc();
2797    let nonce = request
2798        .nonce
2799        .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2800
2801    let channel_id = ChannelId::from_proto(request.channel_id);
2802    let (message_id, connection_ids) = session
2803        .db()
2804        .await
2805        .create_channel_message(
2806            channel_id,
2807            session.user_id,
2808            &body,
2809            timestamp,
2810            nonce.clone().into(),
2811        )
2812        .await?;
2813    let message = proto::ChannelMessage {
2814        sender_id: session.user_id.to_proto(),
2815        id: message_id.to_proto(),
2816        body,
2817        timestamp: timestamp.unix_timestamp() as u64,
2818        nonce: Some(nonce),
2819    };
2820    broadcast(Some(session.connection_id), connection_ids, |connection| {
2821        session.peer.send(
2822            connection,
2823            proto::ChannelMessageSent {
2824                channel_id: channel_id.to_proto(),
2825                message: Some(message.clone()),
2826            },
2827        )
2828    });
2829    response.send(proto::SendChannelMessageResponse {
2830        message: Some(message),
2831    })?;
2832    Ok(())
2833}
2834
2835async fn remove_channel_message(
2836    request: proto::RemoveChannelMessage,
2837    response: Response<proto::RemoveChannelMessage>,
2838    session: Session,
2839) -> Result<()> {
2840    let channel_id = ChannelId::from_proto(request.channel_id);
2841    let message_id = MessageId::from_proto(request.message_id);
2842    let connection_ids = session
2843        .db()
2844        .await
2845        .remove_channel_message(channel_id, message_id, session.user_id)
2846        .await?;
2847    broadcast(Some(session.connection_id), connection_ids, |connection| {
2848        session.peer.send(connection, request.clone())
2849    });
2850    response.send(proto::Ack {})?;
2851    Ok(())
2852}
2853
2854async fn join_channel_chat(
2855    request: proto::JoinChannelChat,
2856    response: Response<proto::JoinChannelChat>,
2857    session: Session,
2858) -> Result<()> {
2859    let channel_id = ChannelId::from_proto(request.channel_id);
2860
2861    let db = session.db().await;
2862    db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2863        .await?;
2864    let messages = db
2865        .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2866        .await?;
2867    response.send(proto::JoinChannelChatResponse {
2868        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2869        messages,
2870    })?;
2871    Ok(())
2872}
2873
2874async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2875    let channel_id = ChannelId::from_proto(request.channel_id);
2876    session
2877        .db()
2878        .await
2879        .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2880        .await?;
2881    Ok(())
2882}
2883
2884async fn get_channel_messages(
2885    request: proto::GetChannelMessages,
2886    response: Response<proto::GetChannelMessages>,
2887    session: Session,
2888) -> Result<()> {
2889    let channel_id = ChannelId::from_proto(request.channel_id);
2890    let messages = session
2891        .db()
2892        .await
2893        .get_channel_messages(
2894            channel_id,
2895            session.user_id,
2896            MESSAGE_COUNT_PER_PAGE,
2897            Some(MessageId::from_proto(request.before_message_id)),
2898        )
2899        .await?;
2900    response.send(proto::GetChannelMessagesResponse {
2901        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2902        messages,
2903    })?;
2904    Ok(())
2905}
2906
2907async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2908    let project_id = ProjectId::from_proto(request.project_id);
2909    let project_connection_ids = session
2910        .db()
2911        .await
2912        .project_connection_ids(project_id, session.connection_id)
2913        .await?;
2914    broadcast(
2915        Some(session.connection_id),
2916        project_connection_ids.iter().copied(),
2917        |connection_id| {
2918            session
2919                .peer
2920                .forward_send(session.connection_id, connection_id, request.clone())
2921        },
2922    );
2923    Ok(())
2924}
2925
2926async fn get_private_user_info(
2927    _request: proto::GetPrivateUserInfo,
2928    response: Response<proto::GetPrivateUserInfo>,
2929    session: Session,
2930) -> Result<()> {
2931    let db = session.db().await;
2932
2933    let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2934    let user = db
2935        .get_user_by_id(session.user_id)
2936        .await?
2937        .ok_or_else(|| anyhow!("user not found"))?;
2938    let flags = db.get_user_flags(session.user_id).await?;
2939
2940    response.send(proto::GetPrivateUserInfoResponse {
2941        metrics_id,
2942        staff: user.admin,
2943        flags,
2944    })?;
2945    Ok(())
2946}
2947
2948fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2949    match message {
2950        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2951        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2952        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2953        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2954        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2955            code: frame.code.into(),
2956            reason: frame.reason,
2957        })),
2958    }
2959}
2960
2961fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2962    match message {
2963        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2964        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2965        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2966        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2967        AxumMessage::Close(frame) => {
2968            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2969                code: frame.code.into(),
2970                reason: frame.reason,
2971            }))
2972        }
2973    }
2974}
2975
2976fn build_initial_channels_update(
2977    channels: ChannelsForUser,
2978    channel_invites: Vec<db::Channel>,
2979) -> proto::UpdateChannels {
2980    let mut update = proto::UpdateChannels::default();
2981
2982    for channel in channels.channels.channels {
2983        update.channels.push(proto::Channel {
2984            id: channel.id.to_proto(),
2985            name: channel.name,
2986        });
2987    }
2988
2989    update.insert_edge = channels.channels.edges;
2990
2991    for (channel_id, participants) in channels.channel_participants {
2992        update
2993            .channel_participants
2994            .push(proto::ChannelParticipants {
2995                channel_id: channel_id.to_proto(),
2996                participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2997            });
2998    }
2999
3000    update
3001        .channel_permissions
3002        .extend(
3003            channels
3004                .channels_with_admin_privileges
3005                .into_iter()
3006                .map(|id| proto::ChannelPermission {
3007                    channel_id: id.to_proto(),
3008                    is_admin: true,
3009                }),
3010        );
3011
3012    for channel in channel_invites {
3013        update.channel_invitations.push(proto::Channel {
3014            id: channel.id.to_proto(),
3015            name: channel.name,
3016        });
3017    }
3018
3019    update
3020}
3021
3022fn build_initial_contacts_update(
3023    contacts: Vec<db::Contact>,
3024    pool: &ConnectionPool,
3025) -> proto::UpdateContacts {
3026    let mut update = proto::UpdateContacts::default();
3027
3028    for contact in contacts {
3029        match contact {
3030            db::Contact::Accepted {
3031                user_id,
3032                should_notify,
3033                busy,
3034            } => {
3035                update
3036                    .contacts
3037                    .push(contact_for_user(user_id, should_notify, busy, &pool));
3038            }
3039            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3040            db::Contact::Incoming {
3041                user_id,
3042                should_notify,
3043            } => update
3044                .incoming_requests
3045                .push(proto::IncomingContactRequest {
3046                    requester_id: user_id.to_proto(),
3047                    should_notify,
3048                }),
3049        }
3050    }
3051
3052    update
3053}
3054
3055fn contact_for_user(
3056    user_id: UserId,
3057    should_notify: bool,
3058    busy: bool,
3059    pool: &ConnectionPool,
3060) -> proto::Contact {
3061    proto::Contact {
3062        user_id: user_id.to_proto(),
3063        online: pool.is_user_online(user_id),
3064        busy,
3065        should_notify,
3066    }
3067}
3068
3069fn room_updated(room: &proto::Room, peer: &Peer) {
3070    broadcast(
3071        None,
3072        room.participants
3073            .iter()
3074            .filter_map(|participant| Some(participant.peer_id?.into())),
3075        |peer_id| {
3076            peer.send(
3077                peer_id.into(),
3078                proto::RoomUpdated {
3079                    room: Some(room.clone()),
3080                },
3081            )
3082        },
3083    );
3084}
3085
3086fn channel_updated(
3087    channel_id: ChannelId,
3088    room: &proto::Room,
3089    channel_members: &[UserId],
3090    peer: &Peer,
3091    pool: &ConnectionPool,
3092) {
3093    let participants = room
3094        .participants
3095        .iter()
3096        .map(|p| p.user_id)
3097        .collect::<Vec<_>>();
3098
3099    broadcast(
3100        None,
3101        channel_members
3102            .iter()
3103            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3104        |peer_id| {
3105            peer.send(
3106                peer_id.into(),
3107                proto::UpdateChannels {
3108                    channel_participants: vec![proto::ChannelParticipants {
3109                        channel_id: channel_id.to_proto(),
3110                        participant_user_ids: participants.clone(),
3111                    }],
3112                    ..Default::default()
3113                },
3114            )
3115        },
3116    );
3117}
3118
3119async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3120    let db = session.db().await;
3121
3122    let contacts = db.get_contacts(user_id).await?;
3123    let busy = db.is_user_busy(user_id).await?;
3124
3125    let pool = session.connection_pool().await;
3126    let updated_contact = contact_for_user(user_id, false, busy, &pool);
3127    for contact in contacts {
3128        if let db::Contact::Accepted {
3129            user_id: contact_user_id,
3130            ..
3131        } = contact
3132        {
3133            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3134                session
3135                    .peer
3136                    .send(
3137                        contact_conn_id,
3138                        proto::UpdateContacts {
3139                            contacts: vec![updated_contact.clone()],
3140                            remove_contacts: Default::default(),
3141                            incoming_requests: Default::default(),
3142                            remove_incoming_requests: Default::default(),
3143                            outgoing_requests: Default::default(),
3144                            remove_outgoing_requests: Default::default(),
3145                        },
3146                    )
3147                    .trace_err();
3148            }
3149        }
3150    }
3151    Ok(())
3152}
3153
3154async fn leave_room_for_session(session: &Session) -> Result<()> {
3155    let mut contacts_to_update = HashSet::default();
3156
3157    let room_id;
3158    let canceled_calls_to_user_ids;
3159    let live_kit_room;
3160    let delete_live_kit_room;
3161    let room;
3162    let channel_members;
3163    let channel_id;
3164
3165    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3166        contacts_to_update.insert(session.user_id);
3167
3168        for project in left_room.left_projects.values() {
3169            project_left(project, session);
3170        }
3171
3172        room_id = RoomId::from_proto(left_room.room.id);
3173        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3174        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3175        delete_live_kit_room = left_room.deleted;
3176        room = mem::take(&mut left_room.room);
3177        channel_members = mem::take(&mut left_room.channel_members);
3178        channel_id = left_room.channel_id;
3179
3180        room_updated(&room, &session.peer);
3181    } else {
3182        return Ok(());
3183    }
3184
3185    if let Some(channel_id) = channel_id {
3186        channel_updated(
3187            channel_id,
3188            &room,
3189            &channel_members,
3190            &session.peer,
3191            &*session.connection_pool().await,
3192        );
3193    }
3194
3195    {
3196        let pool = session.connection_pool().await;
3197        for canceled_user_id in canceled_calls_to_user_ids {
3198            for connection_id in pool.user_connection_ids(canceled_user_id) {
3199                session
3200                    .peer
3201                    .send(
3202                        connection_id,
3203                        proto::CallCanceled {
3204                            room_id: room_id.to_proto(),
3205                        },
3206                    )
3207                    .trace_err();
3208            }
3209            contacts_to_update.insert(canceled_user_id);
3210        }
3211    }
3212
3213    for contact_user_id in contacts_to_update {
3214        update_user_contacts(contact_user_id, &session).await?;
3215    }
3216
3217    if let Some(live_kit) = session.live_kit_client.as_ref() {
3218        live_kit
3219            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3220            .await
3221            .trace_err();
3222
3223        if delete_live_kit_room {
3224            live_kit.delete_room(live_kit_room).await.trace_err();
3225        }
3226    }
3227
3228    Ok(())
3229}
3230
3231async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3232    let left_channel_buffers = session
3233        .db()
3234        .await
3235        .leave_channel_buffers(session.connection_id)
3236        .await?;
3237
3238    for (channel_id, connections) in left_channel_buffers {
3239        channel_buffer_updated(
3240            session.connection_id,
3241            connections,
3242            &proto::RemoveChannelBufferCollaborator {
3243                channel_id: channel_id.to_proto(),
3244                peer_id: Some(session.connection_id.into()),
3245            },
3246            &session.peer,
3247        );
3248    }
3249
3250    Ok(())
3251}
3252
3253fn project_left(project: &db::LeftProject, session: &Session) {
3254    for connection_id in &project.connection_ids {
3255        if project.host_user_id == session.user_id {
3256            session
3257                .peer
3258                .send(
3259                    *connection_id,
3260                    proto::UnshareProject {
3261                        project_id: project.id.to_proto(),
3262                    },
3263                )
3264                .trace_err();
3265        } else {
3266            session
3267                .peer
3268                .send(
3269                    *connection_id,
3270                    proto::RemoveProjectCollaborator {
3271                        project_id: project.id.to_proto(),
3272                        peer_id: Some(session.connection_id.into()),
3273                    },
3274                )
3275                .trace_err();
3276        }
3277    }
3278}
3279
3280pub trait ResultExt {
3281    type Ok;
3282
3283    fn trace_err(self) -> Option<Self::Ok>;
3284}
3285
3286impl<T, E> ResultExt for Result<T, E>
3287where
3288    E: std::fmt::Debug,
3289{
3290    type Ok = T;
3291
3292    fn trace_err(self) -> Option<T> {
3293        match self {
3294            Ok(value) => Some(value),
3295            Err(error) => {
3296                tracing::error!("{:?}", error);
3297                None
3298            }
3299        }
3300    }
3301}