rpc.rs

   1mod connection_pool;
   2
   3use crate::{
   4    auth::{self, Impersonator},
   5    db::{
   6        self, BufferId, Channel, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage,
   7        Database, InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project,
   8        ProjectId, RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId, ServerId,
   9        User, UserId,
  10    },
  11    executor::Executor,
  12    AppState, Error, Result,
  13};
  14use anyhow::anyhow;
  15use async_tungstenite::tungstenite::{
  16    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  17};
  18use axum::{
  19    body::Body,
  20    extract::{
  21        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  22        ConnectInfo, WebSocketUpgrade,
  23    },
  24    headers::{Header, HeaderName},
  25    http::StatusCode,
  26    middleware,
  27    response::IntoResponse,
  28    routing::get,
  29    Extension, Router, TypedHeader,
  30};
  31use collections::{HashMap, HashSet};
  32pub use connection_pool::{ConnectionPool, ZedVersion};
  33use futures::{
  34    channel::oneshot,
  35    future::{self, BoxFuture},
  36    stream::FuturesUnordered,
  37    FutureExt, SinkExt, StreamExt, TryStreamExt,
  38};
  39use prometheus::{register_int_gauge, IntGauge};
  40use rpc::{
  41    proto::{
  42        self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo,
  43        RequestMessage, ShareProject, UpdateChannelBufferCollaborators,
  44    },
  45    Connection, ConnectionId, ErrorCode, ErrorCodeExt, ErrorExt, Peer, Receipt, TypedEnvelope,
  46};
  47use serde::{Serialize, Serializer};
  48use std::{
  49    any::TypeId,
  50    fmt,
  51    future::Future,
  52    marker::PhantomData,
  53    mem,
  54    net::SocketAddr,
  55    ops::{Deref, DerefMut},
  56    rc::Rc,
  57    sync::{
  58        atomic::{AtomicBool, Ordering::SeqCst},
  59        Arc, OnceLock,
  60    },
  61    time::{Duration, Instant},
  62};
  63use time::OffsetDateTime;
  64use tokio::sync::{watch, Semaphore};
  65use tower::ServiceBuilder;
  66use tracing::{field, info_span, instrument, Instrument};
  67use util::SemanticVersion;
  68
  69pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  70
  71// kubernetes gives terminated pods 10s to shutdown gracefully. After they're gone, we can clean up old resources.
  72pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(15);
  73
  74const MESSAGE_COUNT_PER_PAGE: usize = 100;
  75const MAX_MESSAGE_LEN: usize = 1024;
  76const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
  77
  78type MessageHandler =
  79    Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
  80
  81struct Response<R> {
  82    peer: Arc<Peer>,
  83    receipt: Receipt<R>,
  84    responded: Arc<AtomicBool>,
  85}
  86
  87impl<R: RequestMessage> Response<R> {
  88    fn send(self, payload: R::Response) -> Result<()> {
  89        self.responded.store(true, SeqCst);
  90        self.peer.respond(self.receipt, payload)?;
  91        Ok(())
  92    }
  93}
  94
  95#[derive(Clone)]
  96struct Session {
  97    user_id: UserId,
  98    connection_id: ConnectionId,
  99    db: Arc<tokio::sync::Mutex<DbHandle>>,
 100    peer: Arc<Peer>,
 101    connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 102    live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
 103    _executor: Executor,
 104}
 105
 106impl Session {
 107    async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
 108        #[cfg(test)]
 109        tokio::task::yield_now().await;
 110        let guard = self.db.lock().await;
 111        #[cfg(test)]
 112        tokio::task::yield_now().await;
 113        guard
 114    }
 115
 116    async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
 117        #[cfg(test)]
 118        tokio::task::yield_now().await;
 119        let guard = self.connection_pool.lock();
 120        ConnectionPoolGuard {
 121            guard,
 122            _not_send: PhantomData,
 123        }
 124    }
 125}
 126
 127impl fmt::Debug for Session {
 128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 129        f.debug_struct("Session")
 130            .field("user_id", &self.user_id)
 131            .field("connection_id", &self.connection_id)
 132            .finish()
 133    }
 134}
 135
 136struct DbHandle(Arc<Database>);
 137
 138impl Deref for DbHandle {
 139    type Target = Database;
 140
 141    fn deref(&self) -> &Self::Target {
 142        self.0.as_ref()
 143    }
 144}
 145
 146pub struct Server {
 147    id: parking_lot::Mutex<ServerId>,
 148    peer: Arc<Peer>,
 149    pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
 150    app_state: Arc<AppState>,
 151    executor: Executor,
 152    handlers: HashMap<TypeId, MessageHandler>,
 153    teardown: watch::Sender<bool>,
 154}
 155
 156pub(crate) struct ConnectionPoolGuard<'a> {
 157    guard: parking_lot::MutexGuard<'a, ConnectionPool>,
 158    _not_send: PhantomData<Rc<()>>,
 159}
 160
 161#[derive(Serialize)]
 162pub struct ServerSnapshot<'a> {
 163    peer: &'a Peer,
 164    #[serde(serialize_with = "serialize_deref")]
 165    connection_pool: ConnectionPoolGuard<'a>,
 166}
 167
 168pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
 169where
 170    S: Serializer,
 171    T: Deref<Target = U>,
 172    U: Serialize,
 173{
 174    Serialize::serialize(value.deref(), serializer)
 175}
 176
 177impl Server {
 178    pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
 179        let mut server = Self {
 180            id: parking_lot::Mutex::new(id),
 181            peer: Peer::new(id.0 as u32),
 182            app_state,
 183            executor,
 184            connection_pool: Default::default(),
 185            handlers: Default::default(),
 186            teardown: watch::channel(false).0,
 187        };
 188
 189        server
 190            .add_request_handler(ping)
 191            .add_request_handler(create_room)
 192            .add_request_handler(join_room)
 193            .add_request_handler(rejoin_room)
 194            .add_request_handler(leave_room)
 195            .add_request_handler(set_room_participant_role)
 196            .add_request_handler(call)
 197            .add_request_handler(cancel_call)
 198            .add_message_handler(decline_call)
 199            .add_request_handler(update_participant_location)
 200            .add_request_handler(share_project)
 201            .add_message_handler(unshare_project)
 202            .add_request_handler(join_project)
 203            .add_request_handler(join_hosted_project)
 204            .add_message_handler(leave_project)
 205            .add_request_handler(update_project)
 206            .add_request_handler(update_worktree)
 207            .add_message_handler(start_language_server)
 208            .add_message_handler(update_language_server)
 209            .add_message_handler(update_diagnostic_summary)
 210            .add_message_handler(update_worktree_settings)
 211            .add_request_handler(forward_read_only_project_request::<proto::GetHover>)
 212            .add_request_handler(forward_read_only_project_request::<proto::GetDefinition>)
 213            .add_request_handler(forward_read_only_project_request::<proto::GetTypeDefinition>)
 214            .add_request_handler(forward_read_only_project_request::<proto::GetReferences>)
 215            .add_request_handler(forward_read_only_project_request::<proto::SearchProject>)
 216            .add_request_handler(forward_read_only_project_request::<proto::GetDocumentHighlights>)
 217            .add_request_handler(forward_read_only_project_request::<proto::GetProjectSymbols>)
 218            .add_request_handler(forward_read_only_project_request::<proto::OpenBufferForSymbol>)
 219            .add_request_handler(forward_read_only_project_request::<proto::OpenBufferById>)
 220            .add_request_handler(forward_read_only_project_request::<proto::SynchronizeBuffers>)
 221            .add_request_handler(forward_read_only_project_request::<proto::InlayHints>)
 222            .add_request_handler(forward_read_only_project_request::<proto::OpenBufferByPath>)
 223            .add_request_handler(forward_mutating_project_request::<proto::GetCompletions>)
 224            .add_request_handler(
 225                forward_mutating_project_request::<proto::ApplyCompletionAdditionalEdits>,
 226            )
 227            .add_request_handler(
 228                forward_mutating_project_request::<proto::ResolveCompletionDocumentation>,
 229            )
 230            .add_request_handler(forward_mutating_project_request::<proto::GetCodeActions>)
 231            .add_request_handler(forward_mutating_project_request::<proto::ApplyCodeAction>)
 232            .add_request_handler(forward_mutating_project_request::<proto::PrepareRename>)
 233            .add_request_handler(forward_mutating_project_request::<proto::PerformRename>)
 234            .add_request_handler(forward_mutating_project_request::<proto::ReloadBuffers>)
 235            .add_request_handler(forward_mutating_project_request::<proto::FormatBuffers>)
 236            .add_request_handler(forward_mutating_project_request::<proto::CreateProjectEntry>)
 237            .add_request_handler(forward_mutating_project_request::<proto::RenameProjectEntry>)
 238            .add_request_handler(forward_mutating_project_request::<proto::CopyProjectEntry>)
 239            .add_request_handler(forward_mutating_project_request::<proto::DeleteProjectEntry>)
 240            .add_request_handler(forward_mutating_project_request::<proto::ExpandProjectEntry>)
 241            .add_request_handler(forward_mutating_project_request::<proto::OnTypeFormatting>)
 242            .add_request_handler(forward_mutating_project_request::<proto::SaveBuffer>)
 243            .add_message_handler(create_buffer_for_peer)
 244            .add_request_handler(update_buffer)
 245            .add_message_handler(broadcast_project_message_from_host::<proto::RefreshInlayHints>)
 246            .add_message_handler(broadcast_project_message_from_host::<proto::UpdateBufferFile>)
 247            .add_message_handler(broadcast_project_message_from_host::<proto::BufferReloaded>)
 248            .add_message_handler(broadcast_project_message_from_host::<proto::BufferSaved>)
 249            .add_message_handler(broadcast_project_message_from_host::<proto::UpdateDiffBase>)
 250            .add_request_handler(get_users)
 251            .add_request_handler(fuzzy_search_users)
 252            .add_request_handler(request_contact)
 253            .add_request_handler(remove_contact)
 254            .add_request_handler(respond_to_contact_request)
 255            .add_request_handler(create_channel)
 256            .add_request_handler(delete_channel)
 257            .add_request_handler(invite_channel_member)
 258            .add_request_handler(remove_channel_member)
 259            .add_request_handler(set_channel_member_role)
 260            .add_request_handler(set_channel_visibility)
 261            .add_request_handler(rename_channel)
 262            .add_request_handler(join_channel_buffer)
 263            .add_request_handler(leave_channel_buffer)
 264            .add_message_handler(update_channel_buffer)
 265            .add_request_handler(rejoin_channel_buffers)
 266            .add_request_handler(get_channel_members)
 267            .add_request_handler(respond_to_channel_invite)
 268            .add_request_handler(join_channel)
 269            .add_request_handler(join_channel_chat)
 270            .add_message_handler(leave_channel_chat)
 271            .add_request_handler(send_channel_message)
 272            .add_request_handler(remove_channel_message)
 273            .add_request_handler(get_channel_messages)
 274            .add_request_handler(get_channel_messages_by_id)
 275            .add_request_handler(get_notifications)
 276            .add_request_handler(mark_notification_as_read)
 277            .add_request_handler(move_channel)
 278            .add_request_handler(follow)
 279            .add_message_handler(unfollow)
 280            .add_message_handler(update_followers)
 281            .add_request_handler(get_private_user_info)
 282            .add_message_handler(acknowledge_channel_message)
 283            .add_message_handler(acknowledge_buffer_version);
 284
 285        Arc::new(server)
 286    }
 287
 288    pub async fn start(&self) -> Result<()> {
 289        let server_id = *self.id.lock();
 290        let app_state = self.app_state.clone();
 291        let peer = self.peer.clone();
 292        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
 293        let pool = self.connection_pool.clone();
 294        let live_kit_client = self.app_state.live_kit_client.clone();
 295
 296        let span = info_span!("start server");
 297        self.executor.spawn_detached(
 298            async move {
 299                tracing::info!("waiting for cleanup timeout");
 300                timeout.await;
 301                tracing::info!("cleanup timeout expired, retrieving stale rooms");
 302                if let Some((room_ids, channel_ids)) = app_state
 303                    .db
 304                    .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
 305                    .await
 306                    .trace_err()
 307                {
 308                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
 309                    tracing::info!(
 310                        stale_channel_buffer_count = channel_ids.len(),
 311                        "retrieved stale channel buffers"
 312                    );
 313
 314                    for channel_id in channel_ids {
 315                        if let Some(refreshed_channel_buffer) = app_state
 316                            .db
 317                            .clear_stale_channel_buffer_collaborators(channel_id, server_id)
 318                            .await
 319                            .trace_err()
 320                        {
 321                            for connection_id in refreshed_channel_buffer.connection_ids {
 322                                peer.send(
 323                                    connection_id,
 324                                    proto::UpdateChannelBufferCollaborators {
 325                                        channel_id: channel_id.to_proto(),
 326                                        collaborators: refreshed_channel_buffer
 327                                            .collaborators
 328                                            .clone(),
 329                                    },
 330                                )
 331                                .trace_err();
 332                            }
 333                        }
 334                    }
 335
 336                    for room_id in room_ids {
 337                        let mut contacts_to_update = HashSet::default();
 338                        let mut canceled_calls_to_user_ids = Vec::new();
 339                        let mut live_kit_room = String::new();
 340                        let mut delete_live_kit_room = false;
 341
 342                        if let Some(mut refreshed_room) = app_state
 343                            .db
 344                            .clear_stale_room_participants(room_id, server_id)
 345                            .await
 346                            .trace_err()
 347                        {
 348                            tracing::info!(
 349                                room_id = room_id.0,
 350                                new_participant_count = refreshed_room.room.participants.len(),
 351                                "refreshed room"
 352                            );
 353                            room_updated(&refreshed_room.room, &peer);
 354                            if let Some(channel) = refreshed_room.channel.as_ref() {
 355                                channel_updated(channel, &refreshed_room.room, &peer, &pool.lock());
 356                            }
 357                            contacts_to_update
 358                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
 359                            contacts_to_update
 360                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
 361                            canceled_calls_to_user_ids =
 362                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
 363                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
 364                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
 365                        }
 366
 367                        {
 368                            let pool = pool.lock();
 369                            for canceled_user_id in canceled_calls_to_user_ids {
 370                                for connection_id in pool.user_connection_ids(canceled_user_id) {
 371                                    peer.send(
 372                                        connection_id,
 373                                        proto::CallCanceled {
 374                                            room_id: room_id.to_proto(),
 375                                        },
 376                                    )
 377                                    .trace_err();
 378                                }
 379                            }
 380                        }
 381
 382                        for user_id in contacts_to_update {
 383                            let busy = app_state.db.is_user_busy(user_id).await.trace_err();
 384                            let contacts = app_state.db.get_contacts(user_id).await.trace_err();
 385                            if let Some((busy, contacts)) = busy.zip(contacts) {
 386                                let pool = pool.lock();
 387                                let updated_contact = contact_for_user(user_id, busy, &pool);
 388                                for contact in contacts {
 389                                    if let db::Contact::Accepted {
 390                                        user_id: contact_user_id,
 391                                        ..
 392                                    } = contact
 393                                    {
 394                                        for contact_conn_id in
 395                                            pool.user_connection_ids(contact_user_id)
 396                                        {
 397                                            peer.send(
 398                                                contact_conn_id,
 399                                                proto::UpdateContacts {
 400                                                    contacts: vec![updated_contact.clone()],
 401                                                    remove_contacts: Default::default(),
 402                                                    incoming_requests: Default::default(),
 403                                                    remove_incoming_requests: Default::default(),
 404                                                    outgoing_requests: Default::default(),
 405                                                    remove_outgoing_requests: Default::default(),
 406                                                },
 407                                            )
 408                                            .trace_err();
 409                                        }
 410                                    }
 411                                }
 412                            }
 413                        }
 414
 415                        if let Some(live_kit) = live_kit_client.as_ref() {
 416                            if delete_live_kit_room {
 417                                live_kit.delete_room(live_kit_room).await.trace_err();
 418                            }
 419                        }
 420                    }
 421                }
 422
 423                app_state
 424                    .db
 425                    .delete_stale_servers(&app_state.config.zed_environment, server_id)
 426                    .await
 427                    .trace_err();
 428            }
 429            .instrument(span),
 430        );
 431        Ok(())
 432    }
 433
 434    pub fn teardown(&self) {
 435        self.peer.teardown();
 436        self.connection_pool.lock().reset();
 437        let _ = self.teardown.send(true);
 438    }
 439
 440    #[cfg(test)]
 441    pub fn reset(&self, id: ServerId) {
 442        self.teardown();
 443        *self.id.lock() = id;
 444        self.peer.reset(id.0 as u32);
 445        let _ = self.teardown.send(false);
 446    }
 447
 448    #[cfg(test)]
 449    pub fn id(&self) -> ServerId {
 450        *self.id.lock()
 451    }
 452
 453    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 454    where
 455        F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
 456        Fut: 'static + Send + Future<Output = Result<()>>,
 457        M: EnvelopedMessage,
 458    {
 459        let prev_handler = self.handlers.insert(
 460            TypeId::of::<M>(),
 461            Box::new(move |envelope, session| {
 462                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 463                let received_at = envelope.received_at;
 464                    tracing::info!(
 465                        "message received"
 466                    );
 467                let start_time = Instant::now();
 468                let future = (handler)(*envelope, session);
 469                async move {
 470                    let result = future.await;
 471                    let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0;
 472                    let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
 473                    let queue_duration_ms = total_duration_ms - processing_duration_ms;
 474                    match result {
 475                        Err(error) => {
 476                            tracing::error!(%error, total_duration_ms, processing_duration_ms, queue_duration_ms, "error handling message")
 477                        }
 478                        Ok(()) => tracing::info!(total_duration_ms, processing_duration_ms, queue_duration_ms, "finished handling message"),
 479                    }
 480                }
 481                .boxed()
 482            }),
 483        );
 484        if prev_handler.is_some() {
 485            panic!("registered a handler for the same message twice");
 486        }
 487        self
 488    }
 489
 490    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 491    where
 492        F: 'static + Send + Sync + Fn(M, Session) -> Fut,
 493        Fut: 'static + Send + Future<Output = Result<()>>,
 494        M: EnvelopedMessage,
 495    {
 496        self.add_handler(move |envelope, session| handler(envelope.payload, session));
 497        self
 498    }
 499
 500    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 501    where
 502        F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
 503        Fut: Send + Future<Output = Result<()>>,
 504        M: RequestMessage,
 505    {
 506        let handler = Arc::new(handler);
 507        self.add_handler(move |envelope, session| {
 508            let receipt = envelope.receipt();
 509            let handler = handler.clone();
 510            async move {
 511                let peer = session.peer.clone();
 512                let responded = Arc::new(AtomicBool::default());
 513                let response = Response {
 514                    peer: peer.clone(),
 515                    responded: responded.clone(),
 516                    receipt,
 517                };
 518                match (handler)(envelope.payload, response, session).await {
 519                    Ok(()) => {
 520                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 521                            Ok(())
 522                        } else {
 523                            Err(anyhow!("handler did not send a response"))?
 524                        }
 525                    }
 526                    Err(error) => {
 527                        let proto_err = match &error {
 528                            Error::Internal(err) => err.to_proto(),
 529                            _ => ErrorCode::Internal.message(format!("{}", error)).to_proto(),
 530                        };
 531                        peer.respond_with_error(receipt, proto_err)?;
 532                        Err(error)
 533                    }
 534                }
 535            }
 536        })
 537    }
 538
 539    #[allow(clippy::too_many_arguments)]
 540    pub fn handle_connection(
 541        self: &Arc<Self>,
 542        connection: Connection,
 543        address: String,
 544        user: User,
 545        zed_version: ZedVersion,
 546        impersonator: Option<User>,
 547        send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 548        executor: Executor,
 549    ) -> impl Future<Output = ()> {
 550        let this = self.clone();
 551        let user_id = user.id;
 552        let login = user.github_login.clone();
 553        let span = info_span!("handle connection", %user_id, %login, %address, impersonator = field::Empty, connection_id = field::Empty);
 554        if let Some(impersonator) = impersonator {
 555            span.record("impersonator", &impersonator.github_login);
 556        }
 557        let mut teardown = self.teardown.subscribe();
 558        async move {
 559            if *teardown.borrow() {
 560                tracing::error!("server is tearing down");
 561                return
 562            }
 563            let (connection_id, handle_io, mut incoming_rx) = this
 564                .peer
 565                .add_connection(connection, {
 566                    let executor = executor.clone();
 567                    move |duration| executor.sleep(duration)
 568                });
 569            tracing::Span::current().record("connection_id", format!("{}", connection_id));
 570            tracing::info!("connection opened");
 571
 572            let session = Session {
 573                user_id,
 574                connection_id,
 575                db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
 576                peer: this.peer.clone(),
 577                connection_pool: this.connection_pool.clone(),
 578                live_kit_client: this.app_state.live_kit_client.clone(),
 579                _executor: executor.clone()
 580            };
 581
 582            if let Err(error) = this.send_initial_client_update(connection_id, user, zed_version, send_connection_id, &session).await {
 583                tracing::error!(?error, "failed to send initial client update");
 584                return;
 585            }
 586
 587            let handle_io = handle_io.fuse();
 588            futures::pin_mut!(handle_io);
 589
 590            // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
 591            // This prevents deadlocks when e.g., client A performs a request to client B and
 592            // client B performs a request to client A. If both clients stop processing further
 593            // messages until their respective request completes, they won't have a chance to
 594            // respond to the other client's request and cause a deadlock.
 595            //
 596            // This arrangement ensures we will attempt to process earlier messages first, but fall
 597            // back to processing messages arrived later in the spirit of making progress.
 598            let mut foreground_message_handlers = FuturesUnordered::new();
 599            let concurrent_handlers = Arc::new(Semaphore::new(256));
 600            loop {
 601                let next_message = async {
 602                    let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
 603                    let message = incoming_rx.next().await;
 604                    (permit, message)
 605                }.fuse();
 606                futures::pin_mut!(next_message);
 607                futures::select_biased! {
 608                    _ = teardown.changed().fuse() => return,
 609                    result = handle_io => {
 610                        if let Err(error) = result {
 611                            tracing::error!(?error, "error handling I/O");
 612                        }
 613                        break;
 614                    }
 615                    _ = foreground_message_handlers.next() => {}
 616                    next_message = next_message => {
 617                        let (permit, message) = next_message;
 618                        if let Some(message) = message {
 619                            let type_name = message.payload_type_name();
 620                            // note: we copy all the fields from the parent span so we can query them in the logs.
 621                            // (https://github.com/tokio-rs/tracing/issues/2670).
 622                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 623                            let span_enter = span.enter();
 624                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 625                                let is_background = message.is_background();
 626                                let handle_message = (handler)(message, session.clone());
 627                                drop(span_enter);
 628
 629                                let handle_message = async move {
 630                                    handle_message.await;
 631                                    drop(permit);
 632                                }.instrument(span);
 633                                if is_background {
 634                                    executor.spawn_detached(handle_message);
 635                                } else {
 636                                    foreground_message_handlers.push(handle_message);
 637                                }
 638                            } else {
 639                                tracing::error!("no message handler");
 640                            }
 641                        } else {
 642                            tracing::info!("connection closed");
 643                            break;
 644                        }
 645                    }
 646                }
 647            }
 648
 649            drop(foreground_message_handlers);
 650            tracing::info!("signing out");
 651            if let Err(error) = connection_lost(session, teardown, executor).await {
 652                tracing::error!(?error, "error signing out");
 653            }
 654
 655        }.instrument(span)
 656    }
 657
 658    async fn send_initial_client_update(
 659        &self,
 660        connection_id: ConnectionId,
 661        user: User,
 662        zed_version: ZedVersion,
 663        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
 664        session: &Session,
 665    ) -> Result<()> {
 666        self.peer.send(
 667            connection_id,
 668            proto::Hello {
 669                peer_id: Some(connection_id.into()),
 670            },
 671        )?;
 672        tracing::info!("sent hello message");
 673
 674        if let Some(send_connection_id) = send_connection_id.take() {
 675            let _ = send_connection_id.send(connection_id);
 676        }
 677
 678        if !user.connected_once {
 679            self.peer.send(connection_id, proto::ShowContacts {})?;
 680            self.app_state
 681                .db
 682                .set_user_connected_once(user.id, true)
 683                .await?;
 684        }
 685
 686        let (contacts, channels_for_user, channel_invites) = future::try_join3(
 687            self.app_state.db.get_contacts(user.id),
 688            self.app_state.db.get_channels_for_user(user.id),
 689            self.app_state.db.get_channel_invites_for_user(user.id),
 690        )
 691        .await?;
 692
 693        {
 694            let mut pool = self.connection_pool.lock();
 695            pool.add_connection(connection_id, user.id, user.admin, zed_version);
 696            for membership in &channels_for_user.channel_memberships {
 697                pool.subscribe_to_channel(user.id, membership.channel_id, membership.role)
 698            }
 699            self.peer.send(
 700                connection_id,
 701                build_initial_contacts_update(contacts, &pool),
 702            )?;
 703            self.peer.send(
 704                connection_id,
 705                build_update_user_channels(&channels_for_user),
 706            )?;
 707            self.peer.send(
 708                connection_id,
 709                build_channels_update(channels_for_user, channel_invites),
 710            )?;
 711        }
 712
 713        if let Some(incoming_call) = self.app_state.db.incoming_call_for_user(user.id).await? {
 714            self.peer.send(connection_id, incoming_call)?;
 715        }
 716
 717        update_user_contacts(user.id, &session).await?;
 718        Ok(())
 719    }
 720
 721    pub async fn invite_code_redeemed(
 722        self: &Arc<Self>,
 723        inviter_id: UserId,
 724        invitee_id: UserId,
 725    ) -> Result<()> {
 726        if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
 727            if let Some(code) = &user.invite_code {
 728                let pool = self.connection_pool.lock();
 729                let invitee_contact = contact_for_user(invitee_id, false, &pool);
 730                for connection_id in pool.user_connection_ids(inviter_id) {
 731                    self.peer.send(
 732                        connection_id,
 733                        proto::UpdateContacts {
 734                            contacts: vec![invitee_contact.clone()],
 735                            ..Default::default()
 736                        },
 737                    )?;
 738                    self.peer.send(
 739                        connection_id,
 740                        proto::UpdateInviteInfo {
 741                            url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
 742                            count: user.invite_count as u32,
 743                        },
 744                    )?;
 745                }
 746            }
 747        }
 748        Ok(())
 749    }
 750
 751    pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
 752        if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
 753            if let Some(invite_code) = &user.invite_code {
 754                let pool = self.connection_pool.lock();
 755                for connection_id in pool.user_connection_ids(user_id) {
 756                    self.peer.send(
 757                        connection_id,
 758                        proto::UpdateInviteInfo {
 759                            url: format!(
 760                                "{}{}",
 761                                self.app_state.config.invite_link_prefix, invite_code
 762                            ),
 763                            count: user.invite_count as u32,
 764                        },
 765                    )?;
 766                }
 767            }
 768        }
 769        Ok(())
 770    }
 771
 772    pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
 773        ServerSnapshot {
 774            connection_pool: ConnectionPoolGuard {
 775                guard: self.connection_pool.lock(),
 776                _not_send: PhantomData,
 777            },
 778            peer: &self.peer,
 779        }
 780    }
 781}
 782
 783impl<'a> Deref for ConnectionPoolGuard<'a> {
 784    type Target = ConnectionPool;
 785
 786    fn deref(&self) -> &Self::Target {
 787        &self.guard
 788    }
 789}
 790
 791impl<'a> DerefMut for ConnectionPoolGuard<'a> {
 792    fn deref_mut(&mut self) -> &mut Self::Target {
 793        &mut self.guard
 794    }
 795}
 796
 797impl<'a> Drop for ConnectionPoolGuard<'a> {
 798    fn drop(&mut self) {
 799        #[cfg(test)]
 800        self.check_invariants();
 801    }
 802}
 803
 804fn broadcast<F>(
 805    sender_id: Option<ConnectionId>,
 806    receiver_ids: impl IntoIterator<Item = ConnectionId>,
 807    mut f: F,
 808) where
 809    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 810{
 811    for receiver_id in receiver_ids {
 812        if Some(receiver_id) != sender_id {
 813            if let Err(error) = f(receiver_id) {
 814                tracing::error!("failed to send to {:?} {}", receiver_id, error);
 815            }
 816        }
 817    }
 818}
 819
 820pub struct ProtocolVersion(u32);
 821
 822impl Header for ProtocolVersion {
 823    fn name() -> &'static HeaderName {
 824        static ZED_PROTOCOL_VERSION: OnceLock<HeaderName> = OnceLock::new();
 825        ZED_PROTOCOL_VERSION.get_or_init(|| HeaderName::from_static("x-zed-protocol-version"))
 826    }
 827
 828    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 829    where
 830        Self: Sized,
 831        I: Iterator<Item = &'i axum::http::HeaderValue>,
 832    {
 833        let version = values
 834            .next()
 835            .ok_or_else(axum::headers::Error::invalid)?
 836            .to_str()
 837            .map_err(|_| axum::headers::Error::invalid())?
 838            .parse()
 839            .map_err(|_| axum::headers::Error::invalid())?;
 840        Ok(Self(version))
 841    }
 842
 843    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 844        values.extend([self.0.to_string().parse().unwrap()]);
 845    }
 846}
 847
 848pub struct AppVersionHeader(SemanticVersion);
 849impl Header for AppVersionHeader {
 850    fn name() -> &'static HeaderName {
 851        static ZED_APP_VERSION: OnceLock<HeaderName> = OnceLock::new();
 852        ZED_APP_VERSION.get_or_init(|| HeaderName::from_static("x-zed-app-version"))
 853    }
 854
 855    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 856    where
 857        Self: Sized,
 858        I: Iterator<Item = &'i axum::http::HeaderValue>,
 859    {
 860        let version = values
 861            .next()
 862            .ok_or_else(axum::headers::Error::invalid)?
 863            .to_str()
 864            .map_err(|_| axum::headers::Error::invalid())?
 865            .parse()
 866            .map_err(|_| axum::headers::Error::invalid())?;
 867        Ok(Self(version))
 868    }
 869
 870    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
 871        values.extend([self.0.to_string().parse().unwrap()]);
 872    }
 873}
 874
 875pub fn routes(server: Arc<Server>) -> Router<(), Body> {
 876    Router::new()
 877        .route("/rpc", get(handle_websocket_request))
 878        .layer(
 879            ServiceBuilder::new()
 880                .layer(Extension(server.app_state.clone()))
 881                .layer(middleware::from_fn(auth::validate_header)),
 882        )
 883        .route("/metrics", get(handle_metrics))
 884        .layer(Extension(server))
 885}
 886
 887pub async fn handle_websocket_request(
 888    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
 889    app_version_header: Option<TypedHeader<AppVersionHeader>>,
 890    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
 891    Extension(server): Extension<Arc<Server>>,
 892    Extension(user): Extension<User>,
 893    Extension(impersonator): Extension<Impersonator>,
 894    ws: WebSocketUpgrade,
 895) -> axum::response::Response {
 896    if protocol_version != rpc::PROTOCOL_VERSION {
 897        return (
 898            StatusCode::UPGRADE_REQUIRED,
 899            "client must be upgraded".to_string(),
 900        )
 901            .into_response();
 902    }
 903
 904    let Some(version) = app_version_header.map(|header| ZedVersion(header.0 .0)) else {
 905        return (
 906            StatusCode::UPGRADE_REQUIRED,
 907            "no version header found".to_string(),
 908        )
 909            .into_response();
 910    };
 911
 912    if !version.can_collaborate() {
 913        return (
 914            StatusCode::UPGRADE_REQUIRED,
 915            "client must be upgraded".to_string(),
 916        )
 917            .into_response();
 918    }
 919
 920    let socket_address = socket_address.to_string();
 921    ws.on_upgrade(move |socket| {
 922        let socket = socket
 923            .map_ok(to_tungstenite_message)
 924            .err_into()
 925            .with(|message| async move { Ok(to_axum_message(message)) });
 926        let connection = Connection::new(Box::pin(socket));
 927        async move {
 928            server
 929                .handle_connection(
 930                    connection,
 931                    socket_address,
 932                    user,
 933                    version,
 934                    impersonator.0,
 935                    None,
 936                    Executor::Production,
 937                )
 938                .await;
 939        }
 940    })
 941}
 942
 943pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
 944    static CONNECTIONS_METRIC: OnceLock<IntGauge> = OnceLock::new();
 945    let connections_metric = CONNECTIONS_METRIC
 946        .get_or_init(|| register_int_gauge!("connections", "number of connections").unwrap());
 947
 948    let connections = server
 949        .connection_pool
 950        .lock()
 951        .connections()
 952        .filter(|connection| !connection.admin)
 953        .count();
 954    connections_metric.set(connections as _);
 955
 956    static SHARED_PROJECTS_METRIC: OnceLock<IntGauge> = OnceLock::new();
 957    let shared_projects_metric = SHARED_PROJECTS_METRIC.get_or_init(|| {
 958        register_int_gauge!(
 959            "shared_projects",
 960            "number of open projects with one or more guests"
 961        )
 962        .unwrap()
 963    });
 964
 965    let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
 966    shared_projects_metric.set(shared_projects as _);
 967
 968    let encoder = prometheus::TextEncoder::new();
 969    let metric_families = prometheus::gather();
 970    let encoded_metrics = encoder
 971        .encode_to_string(&metric_families)
 972        .map_err(|err| anyhow!("{}", err))?;
 973    Ok(encoded_metrics)
 974}
 975
 976#[instrument(err, skip(executor))]
 977async fn connection_lost(
 978    session: Session,
 979    mut teardown: watch::Receiver<bool>,
 980    executor: Executor,
 981) -> Result<()> {
 982    session.peer.disconnect(session.connection_id);
 983    session
 984        .connection_pool()
 985        .await
 986        .remove_connection(session.connection_id)?;
 987
 988    session
 989        .db()
 990        .await
 991        .connection_lost(session.connection_id)
 992        .await
 993        .trace_err();
 994
 995    futures::select_biased! {
 996        _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
 997            log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
 998            leave_room_for_session(&session).await.trace_err();
 999            leave_channel_buffers_for_session(&session)
1000                .await
1001                .trace_err();
1002
1003            if !session
1004                .connection_pool()
1005                .await
1006                .is_user_online(session.user_id)
1007            {
1008                let db = session.db().await;
1009                if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
1010                    room_updated(&room, &session.peer);
1011                }
1012            }
1013
1014            update_user_contacts(session.user_id, &session).await?;
1015        }
1016        _ = teardown.changed().fuse() => {}
1017    }
1018
1019    Ok(())
1020}
1021
1022/// Acknowledges a ping from a client, used to keep the connection alive.
1023async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
1024    response.send(proto::Ack {})?;
1025    Ok(())
1026}
1027
1028/// Creates a new room for calling (outside of channels)
1029async fn create_room(
1030    _request: proto::CreateRoom,
1031    response: Response<proto::CreateRoom>,
1032    session: Session,
1033) -> Result<()> {
1034    let live_kit_room = nanoid::nanoid!(30);
1035
1036    let live_kit_connection_info = {
1037        let live_kit_room = live_kit_room.clone();
1038        let live_kit = session.live_kit_client.as_ref();
1039
1040        util::async_maybe!({
1041            let live_kit = live_kit?;
1042
1043            let token = live_kit
1044                .room_token(&live_kit_room, &session.user_id.to_string())
1045                .trace_err()?;
1046
1047            Some(proto::LiveKitConnectionInfo {
1048                server_url: live_kit.url().into(),
1049                token,
1050                can_publish: true,
1051            })
1052        })
1053    }
1054    .await;
1055
1056    let room = session
1057        .db()
1058        .await
1059        .create_room(session.user_id, session.connection_id, &live_kit_room)
1060        .await?;
1061
1062    response.send(proto::CreateRoomResponse {
1063        room: Some(room.clone()),
1064        live_kit_connection_info,
1065    })?;
1066
1067    update_user_contacts(session.user_id, &session).await?;
1068    Ok(())
1069}
1070
1071/// Join a room from an invitation. Equivalent to joining a channel if there is one.
1072async fn join_room(
1073    request: proto::JoinRoom,
1074    response: Response<proto::JoinRoom>,
1075    session: Session,
1076) -> Result<()> {
1077    let room_id = RoomId::from_proto(request.id);
1078
1079    let channel_id = session.db().await.channel_id_for_room(room_id).await?;
1080
1081    if let Some(channel_id) = channel_id {
1082        return join_channel_internal(channel_id, Box::new(response), session).await;
1083    }
1084
1085    let joined_room = {
1086        let room = session
1087            .db()
1088            .await
1089            .join_room(room_id, session.user_id, session.connection_id)
1090            .await?;
1091        room_updated(&room.room, &session.peer);
1092        room.into_inner()
1093    };
1094
1095    for connection_id in session
1096        .connection_pool()
1097        .await
1098        .user_connection_ids(session.user_id)
1099    {
1100        session
1101            .peer
1102            .send(
1103                connection_id,
1104                proto::CallCanceled {
1105                    room_id: room_id.to_proto(),
1106                },
1107            )
1108            .trace_err();
1109    }
1110
1111    let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1112        if let Some(token) = live_kit
1113            .room_token(
1114                &joined_room.room.live_kit_room,
1115                &session.user_id.to_string(),
1116            )
1117            .trace_err()
1118        {
1119            Some(proto::LiveKitConnectionInfo {
1120                server_url: live_kit.url().into(),
1121                token,
1122                can_publish: true,
1123            })
1124        } else {
1125            None
1126        }
1127    } else {
1128        None
1129    };
1130
1131    response.send(proto::JoinRoomResponse {
1132        room: Some(joined_room.room),
1133        channel_id: None,
1134        live_kit_connection_info,
1135    })?;
1136
1137    update_user_contacts(session.user_id, &session).await?;
1138    Ok(())
1139}
1140
1141/// Rejoin room is used to reconnect to a room after connection errors.
1142async fn rejoin_room(
1143    request: proto::RejoinRoom,
1144    response: Response<proto::RejoinRoom>,
1145    session: Session,
1146) -> Result<()> {
1147    let room;
1148    let channel;
1149    {
1150        let mut rejoined_room = session
1151            .db()
1152            .await
1153            .rejoin_room(request, session.user_id, session.connection_id)
1154            .await?;
1155
1156        response.send(proto::RejoinRoomResponse {
1157            room: Some(rejoined_room.room.clone()),
1158            reshared_projects: rejoined_room
1159                .reshared_projects
1160                .iter()
1161                .map(|project| proto::ResharedProject {
1162                    id: project.id.to_proto(),
1163                    collaborators: project
1164                        .collaborators
1165                        .iter()
1166                        .map(|collaborator| collaborator.to_proto())
1167                        .collect(),
1168                })
1169                .collect(),
1170            rejoined_projects: rejoined_room
1171                .rejoined_projects
1172                .iter()
1173                .map(|rejoined_project| proto::RejoinedProject {
1174                    id: rejoined_project.id.to_proto(),
1175                    worktrees: rejoined_project
1176                        .worktrees
1177                        .iter()
1178                        .map(|worktree| proto::WorktreeMetadata {
1179                            id: worktree.id,
1180                            root_name: worktree.root_name.clone(),
1181                            visible: worktree.visible,
1182                            abs_path: worktree.abs_path.clone(),
1183                        })
1184                        .collect(),
1185                    collaborators: rejoined_project
1186                        .collaborators
1187                        .iter()
1188                        .map(|collaborator| collaborator.to_proto())
1189                        .collect(),
1190                    language_servers: rejoined_project.language_servers.clone(),
1191                })
1192                .collect(),
1193        })?;
1194        room_updated(&rejoined_room.room, &session.peer);
1195
1196        for project in &rejoined_room.reshared_projects {
1197            for collaborator in &project.collaborators {
1198                session
1199                    .peer
1200                    .send(
1201                        collaborator.connection_id,
1202                        proto::UpdateProjectCollaborator {
1203                            project_id: project.id.to_proto(),
1204                            old_peer_id: Some(project.old_connection_id.into()),
1205                            new_peer_id: Some(session.connection_id.into()),
1206                        },
1207                    )
1208                    .trace_err();
1209            }
1210
1211            broadcast(
1212                Some(session.connection_id),
1213                project
1214                    .collaborators
1215                    .iter()
1216                    .map(|collaborator| collaborator.connection_id),
1217                |connection_id| {
1218                    session.peer.forward_send(
1219                        session.connection_id,
1220                        connection_id,
1221                        proto::UpdateProject {
1222                            project_id: project.id.to_proto(),
1223                            worktrees: project.worktrees.clone(),
1224                        },
1225                    )
1226                },
1227            );
1228        }
1229
1230        for project in &rejoined_room.rejoined_projects {
1231            for collaborator in &project.collaborators {
1232                session
1233                    .peer
1234                    .send(
1235                        collaborator.connection_id,
1236                        proto::UpdateProjectCollaborator {
1237                            project_id: project.id.to_proto(),
1238                            old_peer_id: Some(project.old_connection_id.into()),
1239                            new_peer_id: Some(session.connection_id.into()),
1240                        },
1241                    )
1242                    .trace_err();
1243            }
1244        }
1245
1246        for project in &mut rejoined_room.rejoined_projects {
1247            for worktree in mem::take(&mut project.worktrees) {
1248                #[cfg(any(test, feature = "test-support"))]
1249                const MAX_CHUNK_SIZE: usize = 2;
1250                #[cfg(not(any(test, feature = "test-support")))]
1251                const MAX_CHUNK_SIZE: usize = 256;
1252
1253                // Stream this worktree's entries.
1254                let message = proto::UpdateWorktree {
1255                    project_id: project.id.to_proto(),
1256                    worktree_id: worktree.id,
1257                    abs_path: worktree.abs_path.clone(),
1258                    root_name: worktree.root_name,
1259                    updated_entries: worktree.updated_entries,
1260                    removed_entries: worktree.removed_entries,
1261                    scan_id: worktree.scan_id,
1262                    is_last_update: worktree.completed_scan_id == worktree.scan_id,
1263                    updated_repositories: worktree.updated_repositories,
1264                    removed_repositories: worktree.removed_repositories,
1265                };
1266                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1267                    session.peer.send(session.connection_id, update.clone())?;
1268                }
1269
1270                // Stream this worktree's diagnostics.
1271                for summary in worktree.diagnostic_summaries {
1272                    session.peer.send(
1273                        session.connection_id,
1274                        proto::UpdateDiagnosticSummary {
1275                            project_id: project.id.to_proto(),
1276                            worktree_id: worktree.id,
1277                            summary: Some(summary),
1278                        },
1279                    )?;
1280                }
1281
1282                for settings_file in worktree.settings_files {
1283                    session.peer.send(
1284                        session.connection_id,
1285                        proto::UpdateWorktreeSettings {
1286                            project_id: project.id.to_proto(),
1287                            worktree_id: worktree.id,
1288                            path: settings_file.path,
1289                            content: Some(settings_file.content),
1290                        },
1291                    )?;
1292                }
1293            }
1294
1295            for language_server in &project.language_servers {
1296                session.peer.send(
1297                    session.connection_id,
1298                    proto::UpdateLanguageServer {
1299                        project_id: project.id.to_proto(),
1300                        language_server_id: language_server.id,
1301                        variant: Some(
1302                            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1303                                proto::LspDiskBasedDiagnosticsUpdated {},
1304                            ),
1305                        ),
1306                    },
1307                )?;
1308            }
1309        }
1310
1311        let rejoined_room = rejoined_room.into_inner();
1312
1313        room = rejoined_room.room;
1314        channel = rejoined_room.channel;
1315    }
1316
1317    if let Some(channel) = channel {
1318        channel_updated(
1319            &channel,
1320            &room,
1321            &session.peer,
1322            &*session.connection_pool().await,
1323        );
1324    }
1325
1326    update_user_contacts(session.user_id, &session).await?;
1327    Ok(())
1328}
1329
1330/// leave room disconnects from the room.
1331async fn leave_room(
1332    _: proto::LeaveRoom,
1333    response: Response<proto::LeaveRoom>,
1334    session: Session,
1335) -> Result<()> {
1336    leave_room_for_session(&session).await?;
1337    response.send(proto::Ack {})?;
1338    Ok(())
1339}
1340
1341/// Updates the permissions of someone else in the room.
1342async fn set_room_participant_role(
1343    request: proto::SetRoomParticipantRole,
1344    response: Response<proto::SetRoomParticipantRole>,
1345    session: Session,
1346) -> Result<()> {
1347    let user_id = UserId::from_proto(request.user_id);
1348    let role = ChannelRole::from(request.role());
1349
1350    let (live_kit_room, can_publish) = {
1351        let room = session
1352            .db()
1353            .await
1354            .set_room_participant_role(
1355                session.user_id,
1356                RoomId::from_proto(request.room_id),
1357                user_id,
1358                role,
1359            )
1360            .await?;
1361
1362        let live_kit_room = room.live_kit_room.clone();
1363        let can_publish = ChannelRole::from(request.role()).can_use_microphone();
1364        room_updated(&room, &session.peer);
1365        (live_kit_room, can_publish)
1366    };
1367
1368    if let Some(live_kit) = session.live_kit_client.as_ref() {
1369        live_kit
1370            .update_participant(
1371                live_kit_room.clone(),
1372                request.user_id.to_string(),
1373                live_kit_server::proto::ParticipantPermission {
1374                    can_subscribe: true,
1375                    can_publish,
1376                    can_publish_data: can_publish,
1377                    hidden: false,
1378                    recorder: false,
1379                },
1380            )
1381            .await
1382            .trace_err();
1383    }
1384
1385    response.send(proto::Ack {})?;
1386    Ok(())
1387}
1388
1389/// Call someone else into the current room
1390async fn call(
1391    request: proto::Call,
1392    response: Response<proto::Call>,
1393    session: Session,
1394) -> Result<()> {
1395    let room_id = RoomId::from_proto(request.room_id);
1396    let calling_user_id = session.user_id;
1397    let calling_connection_id = session.connection_id;
1398    let called_user_id = UserId::from_proto(request.called_user_id);
1399    let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1400    if !session
1401        .db()
1402        .await
1403        .has_contact(calling_user_id, called_user_id)
1404        .await?
1405    {
1406        return Err(anyhow!("cannot call a user who isn't a contact"))?;
1407    }
1408
1409    let incoming_call = {
1410        let (room, incoming_call) = &mut *session
1411            .db()
1412            .await
1413            .call(
1414                room_id,
1415                calling_user_id,
1416                calling_connection_id,
1417                called_user_id,
1418                initial_project_id,
1419            )
1420            .await?;
1421        room_updated(&room, &session.peer);
1422        mem::take(incoming_call)
1423    };
1424    update_user_contacts(called_user_id, &session).await?;
1425
1426    let mut calls = session
1427        .connection_pool()
1428        .await
1429        .user_connection_ids(called_user_id)
1430        .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1431        .collect::<FuturesUnordered<_>>();
1432
1433    while let Some(call_response) = calls.next().await {
1434        match call_response.as_ref() {
1435            Ok(_) => {
1436                response.send(proto::Ack {})?;
1437                return Ok(());
1438            }
1439            Err(_) => {
1440                call_response.trace_err();
1441            }
1442        }
1443    }
1444
1445    {
1446        let room = session
1447            .db()
1448            .await
1449            .call_failed(room_id, called_user_id)
1450            .await?;
1451        room_updated(&room, &session.peer);
1452    }
1453    update_user_contacts(called_user_id, &session).await?;
1454
1455    Err(anyhow!("failed to ring user"))?
1456}
1457
1458/// Cancel an outgoing call.
1459async fn cancel_call(
1460    request: proto::CancelCall,
1461    response: Response<proto::CancelCall>,
1462    session: Session,
1463) -> Result<()> {
1464    let called_user_id = UserId::from_proto(request.called_user_id);
1465    let room_id = RoomId::from_proto(request.room_id);
1466    {
1467        let room = session
1468            .db()
1469            .await
1470            .cancel_call(room_id, session.connection_id, called_user_id)
1471            .await?;
1472        room_updated(&room, &session.peer);
1473    }
1474
1475    for connection_id in session
1476        .connection_pool()
1477        .await
1478        .user_connection_ids(called_user_id)
1479    {
1480        session
1481            .peer
1482            .send(
1483                connection_id,
1484                proto::CallCanceled {
1485                    room_id: room_id.to_proto(),
1486                },
1487            )
1488            .trace_err();
1489    }
1490    response.send(proto::Ack {})?;
1491
1492    update_user_contacts(called_user_id, &session).await?;
1493    Ok(())
1494}
1495
1496/// Decline an incoming call.
1497async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1498    let room_id = RoomId::from_proto(message.room_id);
1499    {
1500        let room = session
1501            .db()
1502            .await
1503            .decline_call(Some(room_id), session.user_id)
1504            .await?
1505            .ok_or_else(|| anyhow!("failed to decline call"))?;
1506        room_updated(&room, &session.peer);
1507    }
1508
1509    for connection_id in session
1510        .connection_pool()
1511        .await
1512        .user_connection_ids(session.user_id)
1513    {
1514        session
1515            .peer
1516            .send(
1517                connection_id,
1518                proto::CallCanceled {
1519                    room_id: room_id.to_proto(),
1520                },
1521            )
1522            .trace_err();
1523    }
1524    update_user_contacts(session.user_id, &session).await?;
1525    Ok(())
1526}
1527
1528/// Updates other participants in the room with your current location.
1529async fn update_participant_location(
1530    request: proto::UpdateParticipantLocation,
1531    response: Response<proto::UpdateParticipantLocation>,
1532    session: Session,
1533) -> Result<()> {
1534    let room_id = RoomId::from_proto(request.room_id);
1535    let location = request
1536        .location
1537        .ok_or_else(|| anyhow!("invalid location"))?;
1538
1539    let db = session.db().await;
1540    let room = db
1541        .update_room_participant_location(room_id, session.connection_id, location)
1542        .await?;
1543
1544    room_updated(&room, &session.peer);
1545    response.send(proto::Ack {})?;
1546    Ok(())
1547}
1548
1549/// Share a project into the room.
1550async fn share_project(
1551    request: proto::ShareProject,
1552    response: Response<proto::ShareProject>,
1553    session: Session,
1554) -> Result<()> {
1555    let (project_id, room) = &*session
1556        .db()
1557        .await
1558        .share_project(
1559            RoomId::from_proto(request.room_id),
1560            session.connection_id,
1561            &request.worktrees,
1562        )
1563        .await?;
1564    response.send(proto::ShareProjectResponse {
1565        project_id: project_id.to_proto(),
1566    })?;
1567    room_updated(&room, &session.peer);
1568
1569    Ok(())
1570}
1571
1572/// Unshare a project from the room.
1573async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1574    let project_id = ProjectId::from_proto(message.project_id);
1575
1576    let (room, guest_connection_ids) = &*session
1577        .db()
1578        .await
1579        .unshare_project(project_id, session.connection_id)
1580        .await?;
1581
1582    broadcast(
1583        Some(session.connection_id),
1584        guest_connection_ids.iter().copied(),
1585        |conn_id| session.peer.send(conn_id, message.clone()),
1586    );
1587    room_updated(&room, &session.peer);
1588
1589    Ok(())
1590}
1591
1592/// Join someone elses shared project.
1593async fn join_project(
1594    request: proto::JoinProject,
1595    response: Response<proto::JoinProject>,
1596    session: Session,
1597) -> Result<()> {
1598    let project_id = ProjectId::from_proto(request.project_id);
1599
1600    tracing::info!(%project_id, "join project");
1601
1602    let (project, replica_id) = &mut *session
1603        .db()
1604        .await
1605        .join_project_in_room(project_id, session.connection_id)
1606        .await?;
1607
1608    join_project_internal(response, session, project, replica_id)
1609}
1610
1611trait JoinProjectInternalResponse {
1612    fn send(self, result: proto::JoinProjectResponse) -> Result<()>;
1613}
1614impl JoinProjectInternalResponse for Response<proto::JoinProject> {
1615    fn send(self, result: proto::JoinProjectResponse) -> Result<()> {
1616        Response::<proto::JoinProject>::send(self, result)
1617    }
1618}
1619impl JoinProjectInternalResponse for Response<proto::JoinHostedProject> {
1620    fn send(self, result: proto::JoinProjectResponse) -> Result<()> {
1621        Response::<proto::JoinHostedProject>::send(self, result)
1622    }
1623}
1624
1625fn join_project_internal(
1626    response: impl JoinProjectInternalResponse,
1627    session: Session,
1628    project: &mut Project,
1629    replica_id: &ReplicaId,
1630) -> Result<()> {
1631    let collaborators = project
1632        .collaborators
1633        .iter()
1634        .filter(|collaborator| collaborator.connection_id != session.connection_id)
1635        .map(|collaborator| collaborator.to_proto())
1636        .collect::<Vec<_>>();
1637    let project_id = project.id;
1638    let guest_user_id = session.user_id;
1639
1640    let worktrees = project
1641        .worktrees
1642        .iter()
1643        .map(|(id, worktree)| proto::WorktreeMetadata {
1644            id: *id,
1645            root_name: worktree.root_name.clone(),
1646            visible: worktree.visible,
1647            abs_path: worktree.abs_path.clone(),
1648        })
1649        .collect::<Vec<_>>();
1650
1651    for collaborator in &collaborators {
1652        session
1653            .peer
1654            .send(
1655                collaborator.peer_id.unwrap().into(),
1656                proto::AddProjectCollaborator {
1657                    project_id: project_id.to_proto(),
1658                    collaborator: Some(proto::Collaborator {
1659                        peer_id: Some(session.connection_id.into()),
1660                        replica_id: replica_id.0 as u32,
1661                        user_id: guest_user_id.to_proto(),
1662                    }),
1663                },
1664            )
1665            .trace_err();
1666    }
1667
1668    // First, we send the metadata associated with each worktree.
1669    response.send(proto::JoinProjectResponse {
1670        project_id: project.id.0 as u64,
1671        worktrees: worktrees.clone(),
1672        replica_id: replica_id.0 as u32,
1673        collaborators: collaborators.clone(),
1674        language_servers: project.language_servers.clone(),
1675        role: project.role.into(), // todo
1676    })?;
1677
1678    for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1679        #[cfg(any(test, feature = "test-support"))]
1680        const MAX_CHUNK_SIZE: usize = 2;
1681        #[cfg(not(any(test, feature = "test-support")))]
1682        const MAX_CHUNK_SIZE: usize = 256;
1683
1684        // Stream this worktree's entries.
1685        let message = proto::UpdateWorktree {
1686            project_id: project_id.to_proto(),
1687            worktree_id,
1688            abs_path: worktree.abs_path.clone(),
1689            root_name: worktree.root_name,
1690            updated_entries: worktree.entries,
1691            removed_entries: Default::default(),
1692            scan_id: worktree.scan_id,
1693            is_last_update: worktree.scan_id == worktree.completed_scan_id,
1694            updated_repositories: worktree.repository_entries.into_values().collect(),
1695            removed_repositories: Default::default(),
1696        };
1697        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1698            session.peer.send(session.connection_id, update.clone())?;
1699        }
1700
1701        // Stream this worktree's diagnostics.
1702        for summary in worktree.diagnostic_summaries {
1703            session.peer.send(
1704                session.connection_id,
1705                proto::UpdateDiagnosticSummary {
1706                    project_id: project_id.to_proto(),
1707                    worktree_id: worktree.id,
1708                    summary: Some(summary),
1709                },
1710            )?;
1711        }
1712
1713        for settings_file in worktree.settings_files {
1714            session.peer.send(
1715                session.connection_id,
1716                proto::UpdateWorktreeSettings {
1717                    project_id: project_id.to_proto(),
1718                    worktree_id: worktree.id,
1719                    path: settings_file.path,
1720                    content: Some(settings_file.content),
1721                },
1722            )?;
1723        }
1724    }
1725
1726    for language_server in &project.language_servers {
1727        session.peer.send(
1728            session.connection_id,
1729            proto::UpdateLanguageServer {
1730                project_id: project_id.to_proto(),
1731                language_server_id: language_server.id,
1732                variant: Some(
1733                    proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1734                        proto::LspDiskBasedDiagnosticsUpdated {},
1735                    ),
1736                ),
1737            },
1738        )?;
1739    }
1740
1741    Ok(())
1742}
1743
1744/// Leave someone elses shared project.
1745async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1746    let sender_id = session.connection_id;
1747    let project_id = ProjectId::from_proto(request.project_id);
1748    let db = session.db().await;
1749    if db.is_hosted_project(project_id).await? {
1750        let project = db.leave_hosted_project(project_id, sender_id).await?;
1751        project_left(&project, &session);
1752        return Ok(());
1753    }
1754
1755    let (room, project) = &*db.leave_project(project_id, sender_id).await?;
1756    tracing::info!(
1757        %project_id,
1758        host_user_id = ?project.host_user_id,
1759        host_connection_id = ?project.host_connection_id,
1760        "leave project"
1761    );
1762
1763    project_left(&project, &session);
1764    room_updated(&room, &session.peer);
1765
1766    Ok(())
1767}
1768
1769async fn join_hosted_project(
1770    request: proto::JoinHostedProject,
1771    response: Response<proto::JoinHostedProject>,
1772    session: Session,
1773) -> Result<()> {
1774    let (mut project, replica_id) = session
1775        .db()
1776        .await
1777        .join_hosted_project(
1778            ProjectId(request.project_id as i32),
1779            session.user_id,
1780            session.connection_id,
1781        )
1782        .await?;
1783
1784    join_project_internal(response, session, &mut project, &replica_id)
1785}
1786
1787/// Updates other participants with changes to the project
1788async fn update_project(
1789    request: proto::UpdateProject,
1790    response: Response<proto::UpdateProject>,
1791    session: Session,
1792) -> Result<()> {
1793    let project_id = ProjectId::from_proto(request.project_id);
1794    let (room, guest_connection_ids) = &*session
1795        .db()
1796        .await
1797        .update_project(project_id, session.connection_id, &request.worktrees)
1798        .await?;
1799    broadcast(
1800        Some(session.connection_id),
1801        guest_connection_ids.iter().copied(),
1802        |connection_id| {
1803            session
1804                .peer
1805                .forward_send(session.connection_id, connection_id, request.clone())
1806        },
1807    );
1808    room_updated(&room, &session.peer);
1809    response.send(proto::Ack {})?;
1810
1811    Ok(())
1812}
1813
1814/// Updates other participants with changes to the worktree
1815async fn update_worktree(
1816    request: proto::UpdateWorktree,
1817    response: Response<proto::UpdateWorktree>,
1818    session: Session,
1819) -> Result<()> {
1820    let guest_connection_ids = session
1821        .db()
1822        .await
1823        .update_worktree(&request, session.connection_id)
1824        .await?;
1825
1826    broadcast(
1827        Some(session.connection_id),
1828        guest_connection_ids.iter().copied(),
1829        |connection_id| {
1830            session
1831                .peer
1832                .forward_send(session.connection_id, connection_id, request.clone())
1833        },
1834    );
1835    response.send(proto::Ack {})?;
1836    Ok(())
1837}
1838
1839/// Updates other participants with changes to the diagnostics
1840async fn update_diagnostic_summary(
1841    message: proto::UpdateDiagnosticSummary,
1842    session: Session,
1843) -> Result<()> {
1844    let guest_connection_ids = session
1845        .db()
1846        .await
1847        .update_diagnostic_summary(&message, session.connection_id)
1848        .await?;
1849
1850    broadcast(
1851        Some(session.connection_id),
1852        guest_connection_ids.iter().copied(),
1853        |connection_id| {
1854            session
1855                .peer
1856                .forward_send(session.connection_id, connection_id, message.clone())
1857        },
1858    );
1859
1860    Ok(())
1861}
1862
1863/// Updates other participants with changes to the worktree settings
1864async fn update_worktree_settings(
1865    message: proto::UpdateWorktreeSettings,
1866    session: Session,
1867) -> Result<()> {
1868    let guest_connection_ids = session
1869        .db()
1870        .await
1871        .update_worktree_settings(&message, session.connection_id)
1872        .await?;
1873
1874    broadcast(
1875        Some(session.connection_id),
1876        guest_connection_ids.iter().copied(),
1877        |connection_id| {
1878            session
1879                .peer
1880                .forward_send(session.connection_id, connection_id, message.clone())
1881        },
1882    );
1883
1884    Ok(())
1885}
1886
1887/// Notify other participants that a  language server has started.
1888async fn start_language_server(
1889    request: proto::StartLanguageServer,
1890    session: Session,
1891) -> Result<()> {
1892    let guest_connection_ids = session
1893        .db()
1894        .await
1895        .start_language_server(&request, session.connection_id)
1896        .await?;
1897
1898    broadcast(
1899        Some(session.connection_id),
1900        guest_connection_ids.iter().copied(),
1901        |connection_id| {
1902            session
1903                .peer
1904                .forward_send(session.connection_id, connection_id, request.clone())
1905        },
1906    );
1907    Ok(())
1908}
1909
1910/// Notify other participants that a language server has changed.
1911async fn update_language_server(
1912    request: proto::UpdateLanguageServer,
1913    session: Session,
1914) -> Result<()> {
1915    let project_id = ProjectId::from_proto(request.project_id);
1916    let project_connection_ids = session
1917        .db()
1918        .await
1919        .project_connection_ids(project_id, session.connection_id)
1920        .await?;
1921    broadcast(
1922        Some(session.connection_id),
1923        project_connection_ids.iter().copied(),
1924        |connection_id| {
1925            session
1926                .peer
1927                .forward_send(session.connection_id, connection_id, request.clone())
1928        },
1929    );
1930    Ok(())
1931}
1932
1933/// forward a project request to the host. These requests should be read only
1934/// as guests are allowed to send them.
1935async fn forward_read_only_project_request<T>(
1936    request: T,
1937    response: Response<T>,
1938    session: Session,
1939) -> Result<()>
1940where
1941    T: EntityMessage + RequestMessage,
1942{
1943    let project_id = ProjectId::from_proto(request.remote_entity_id());
1944    let host_connection_id = session
1945        .db()
1946        .await
1947        .host_for_read_only_project_request(project_id, session.connection_id)
1948        .await?;
1949    let payload = session
1950        .peer
1951        .forward_request(session.connection_id, host_connection_id, request)
1952        .await?;
1953    response.send(payload)?;
1954    Ok(())
1955}
1956
1957/// forward a project request to the host. These requests are disallowed
1958/// for guests.
1959async fn forward_mutating_project_request<T>(
1960    request: T,
1961    response: Response<T>,
1962    session: Session,
1963) -> Result<()>
1964where
1965    T: EntityMessage + RequestMessage,
1966{
1967    let project_id = ProjectId::from_proto(request.remote_entity_id());
1968    let host_connection_id = session
1969        .db()
1970        .await
1971        .host_for_mutating_project_request(project_id, session.connection_id)
1972        .await?;
1973    let payload = session
1974        .peer
1975        .forward_request(session.connection_id, host_connection_id, request)
1976        .await?;
1977    response.send(payload)?;
1978    Ok(())
1979}
1980
1981/// Notify other participants that a new buffer has been created
1982async fn create_buffer_for_peer(
1983    request: proto::CreateBufferForPeer,
1984    session: Session,
1985) -> Result<()> {
1986    session
1987        .db()
1988        .await
1989        .check_user_is_project_host(
1990            ProjectId::from_proto(request.project_id),
1991            session.connection_id,
1992        )
1993        .await?;
1994    let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1995    session
1996        .peer
1997        .forward_send(session.connection_id, peer_id.into(), request)?;
1998    Ok(())
1999}
2000
2001/// Notify other participants that a buffer has been updated. This is
2002/// allowed for guests as long as the update is limited to selections.
2003async fn update_buffer(
2004    request: proto::UpdateBuffer,
2005    response: Response<proto::UpdateBuffer>,
2006    session: Session,
2007) -> Result<()> {
2008    let project_id = ProjectId::from_proto(request.project_id);
2009    let mut guest_connection_ids;
2010    let mut host_connection_id = None;
2011
2012    let mut requires_write_permission = false;
2013
2014    for op in request.operations.iter() {
2015        match op.variant {
2016            None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
2017            Some(_) => requires_write_permission = true,
2018        }
2019    }
2020
2021    {
2022        let collaborators = session
2023            .db()
2024            .await
2025            .project_collaborators_for_buffer_update(
2026                project_id,
2027                session.connection_id,
2028                requires_write_permission,
2029            )
2030            .await?;
2031        guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
2032        for collaborator in collaborators.iter() {
2033            if collaborator.is_host {
2034                host_connection_id = Some(collaborator.connection_id);
2035            } else {
2036                guest_connection_ids.push(collaborator.connection_id);
2037            }
2038        }
2039    }
2040    let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
2041
2042    broadcast(
2043        Some(session.connection_id),
2044        guest_connection_ids,
2045        |connection_id| {
2046            session
2047                .peer
2048                .forward_send(session.connection_id, connection_id, request.clone())
2049        },
2050    );
2051    if host_connection_id != session.connection_id {
2052        session
2053            .peer
2054            .forward_request(session.connection_id, host_connection_id, request.clone())
2055            .await?;
2056    }
2057
2058    response.send(proto::Ack {})?;
2059    Ok(())
2060}
2061
2062/// Notify other participants that a project has been updated.
2063async fn broadcast_project_message_from_host<T: EntityMessage<Entity = ShareProject>>(
2064    request: T,
2065    session: Session,
2066) -> Result<()> {
2067    let project_id = ProjectId::from_proto(request.remote_entity_id());
2068    let project_connection_ids = session
2069        .db()
2070        .await
2071        .project_connection_ids(project_id, session.connection_id)
2072        .await?;
2073
2074    broadcast(
2075        Some(session.connection_id),
2076        project_connection_ids.iter().copied(),
2077        |connection_id| {
2078            session
2079                .peer
2080                .forward_send(session.connection_id, connection_id, request.clone())
2081        },
2082    );
2083    Ok(())
2084}
2085
2086/// Start following another user in a call.
2087async fn follow(
2088    request: proto::Follow,
2089    response: Response<proto::Follow>,
2090    session: Session,
2091) -> Result<()> {
2092    let room_id = RoomId::from_proto(request.room_id);
2093    let project_id = request.project_id.map(ProjectId::from_proto);
2094    let leader_id = request
2095        .leader_id
2096        .ok_or_else(|| anyhow!("invalid leader id"))?
2097        .into();
2098    let follower_id = session.connection_id;
2099
2100    session
2101        .db()
2102        .await
2103        .check_room_participants(room_id, leader_id, session.connection_id)
2104        .await?;
2105
2106    let response_payload = session
2107        .peer
2108        .forward_request(session.connection_id, leader_id, request)
2109        .await?;
2110    response.send(response_payload)?;
2111
2112    if let Some(project_id) = project_id {
2113        let room = session
2114            .db()
2115            .await
2116            .follow(room_id, project_id, leader_id, follower_id)
2117            .await?;
2118        room_updated(&room, &session.peer);
2119    }
2120
2121    Ok(())
2122}
2123
2124/// Stop following another user in a call.
2125async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
2126    let room_id = RoomId::from_proto(request.room_id);
2127    let project_id = request.project_id.map(ProjectId::from_proto);
2128    let leader_id = request
2129        .leader_id
2130        .ok_or_else(|| anyhow!("invalid leader id"))?
2131        .into();
2132    let follower_id = session.connection_id;
2133
2134    session
2135        .db()
2136        .await
2137        .check_room_participants(room_id, leader_id, session.connection_id)
2138        .await?;
2139
2140    session
2141        .peer
2142        .forward_send(session.connection_id, leader_id, request)?;
2143
2144    if let Some(project_id) = project_id {
2145        let room = session
2146            .db()
2147            .await
2148            .unfollow(room_id, project_id, leader_id, follower_id)
2149            .await?;
2150        room_updated(&room, &session.peer);
2151    }
2152
2153    Ok(())
2154}
2155
2156/// Notify everyone following you of your current location.
2157async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
2158    let room_id = RoomId::from_proto(request.room_id);
2159    let database = session.db.lock().await;
2160
2161    let connection_ids = if let Some(project_id) = request.project_id {
2162        let project_id = ProjectId::from_proto(project_id);
2163        database
2164            .project_connection_ids(project_id, session.connection_id)
2165            .await?
2166    } else {
2167        database
2168            .room_connection_ids(room_id, session.connection_id)
2169            .await?
2170    };
2171
2172    // For now, don't send view update messages back to that view's current leader.
2173    let peer_id_to_omit = request.variant.as_ref().and_then(|variant| match variant {
2174        proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
2175        _ => None,
2176    });
2177
2178    for connection_id in connection_ids.iter().cloned() {
2179        if Some(connection_id.into()) != peer_id_to_omit && connection_id != session.connection_id {
2180            session
2181                .peer
2182                .forward_send(session.connection_id, connection_id, request.clone())?;
2183        }
2184    }
2185    Ok(())
2186}
2187
2188/// Get public data about users.
2189async fn get_users(
2190    request: proto::GetUsers,
2191    response: Response<proto::GetUsers>,
2192    session: Session,
2193) -> Result<()> {
2194    let user_ids = request
2195        .user_ids
2196        .into_iter()
2197        .map(UserId::from_proto)
2198        .collect();
2199    let users = session
2200        .db()
2201        .await
2202        .get_users_by_ids(user_ids)
2203        .await?
2204        .into_iter()
2205        .map(|user| proto::User {
2206            id: user.id.to_proto(),
2207            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2208            github_login: user.github_login,
2209        })
2210        .collect();
2211    response.send(proto::UsersResponse { users })?;
2212    Ok(())
2213}
2214
2215/// Search for users (to invite) buy Github login
2216async fn fuzzy_search_users(
2217    request: proto::FuzzySearchUsers,
2218    response: Response<proto::FuzzySearchUsers>,
2219    session: Session,
2220) -> Result<()> {
2221    let query = request.query;
2222    let users = match query.len() {
2223        0 => vec![],
2224        1 | 2 => session
2225            .db()
2226            .await
2227            .get_user_by_github_login(&query)
2228            .await?
2229            .into_iter()
2230            .collect(),
2231        _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2232    };
2233    let users = users
2234        .into_iter()
2235        .filter(|user| user.id != session.user_id)
2236        .map(|user| proto::User {
2237            id: user.id.to_proto(),
2238            avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2239            github_login: user.github_login,
2240        })
2241        .collect();
2242    response.send(proto::UsersResponse { users })?;
2243    Ok(())
2244}
2245
2246/// Send a contact request to another user.
2247async fn request_contact(
2248    request: proto::RequestContact,
2249    response: Response<proto::RequestContact>,
2250    session: Session,
2251) -> Result<()> {
2252    let requester_id = session.user_id;
2253    let responder_id = UserId::from_proto(request.responder_id);
2254    if requester_id == responder_id {
2255        return Err(anyhow!("cannot add yourself as a contact"))?;
2256    }
2257
2258    let notifications = session
2259        .db()
2260        .await
2261        .send_contact_request(requester_id, responder_id)
2262        .await?;
2263
2264    // Update outgoing contact requests of requester
2265    let mut update = proto::UpdateContacts::default();
2266    update.outgoing_requests.push(responder_id.to_proto());
2267    for connection_id in session
2268        .connection_pool()
2269        .await
2270        .user_connection_ids(requester_id)
2271    {
2272        session.peer.send(connection_id, update.clone())?;
2273    }
2274
2275    // Update incoming contact requests of responder
2276    let mut update = proto::UpdateContacts::default();
2277    update
2278        .incoming_requests
2279        .push(proto::IncomingContactRequest {
2280            requester_id: requester_id.to_proto(),
2281        });
2282    let connection_pool = session.connection_pool().await;
2283    for connection_id in connection_pool.user_connection_ids(responder_id) {
2284        session.peer.send(connection_id, update.clone())?;
2285    }
2286
2287    send_notifications(&connection_pool, &session.peer, notifications);
2288
2289    response.send(proto::Ack {})?;
2290    Ok(())
2291}
2292
2293/// Accept or decline a contact request
2294async fn respond_to_contact_request(
2295    request: proto::RespondToContactRequest,
2296    response: Response<proto::RespondToContactRequest>,
2297    session: Session,
2298) -> Result<()> {
2299    let responder_id = session.user_id;
2300    let requester_id = UserId::from_proto(request.requester_id);
2301    let db = session.db().await;
2302    if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2303        db.dismiss_contact_notification(responder_id, requester_id)
2304            .await?;
2305    } else {
2306        let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2307
2308        let notifications = db
2309            .respond_to_contact_request(responder_id, requester_id, accept)
2310            .await?;
2311        let requester_busy = db.is_user_busy(requester_id).await?;
2312        let responder_busy = db.is_user_busy(responder_id).await?;
2313
2314        let pool = session.connection_pool().await;
2315        // Update responder with new contact
2316        let mut update = proto::UpdateContacts::default();
2317        if accept {
2318            update
2319                .contacts
2320                .push(contact_for_user(requester_id, requester_busy, &pool));
2321        }
2322        update
2323            .remove_incoming_requests
2324            .push(requester_id.to_proto());
2325        for connection_id in pool.user_connection_ids(responder_id) {
2326            session.peer.send(connection_id, update.clone())?;
2327        }
2328
2329        // Update requester with new contact
2330        let mut update = proto::UpdateContacts::default();
2331        if accept {
2332            update
2333                .contacts
2334                .push(contact_for_user(responder_id, responder_busy, &pool));
2335        }
2336        update
2337            .remove_outgoing_requests
2338            .push(responder_id.to_proto());
2339
2340        for connection_id in pool.user_connection_ids(requester_id) {
2341            session.peer.send(connection_id, update.clone())?;
2342        }
2343
2344        send_notifications(&pool, &session.peer, notifications);
2345    }
2346
2347    response.send(proto::Ack {})?;
2348    Ok(())
2349}
2350
2351/// Remove a contact.
2352async fn remove_contact(
2353    request: proto::RemoveContact,
2354    response: Response<proto::RemoveContact>,
2355    session: Session,
2356) -> Result<()> {
2357    let requester_id = session.user_id;
2358    let responder_id = UserId::from_proto(request.user_id);
2359    let db = session.db().await;
2360    let (contact_accepted, deleted_notification_id) =
2361        db.remove_contact(requester_id, responder_id).await?;
2362
2363    let pool = session.connection_pool().await;
2364    // Update outgoing contact requests of requester
2365    let mut update = proto::UpdateContacts::default();
2366    if contact_accepted {
2367        update.remove_contacts.push(responder_id.to_proto());
2368    } else {
2369        update
2370            .remove_outgoing_requests
2371            .push(responder_id.to_proto());
2372    }
2373    for connection_id in pool.user_connection_ids(requester_id) {
2374        session.peer.send(connection_id, update.clone())?;
2375    }
2376
2377    // Update incoming contact requests of responder
2378    let mut update = proto::UpdateContacts::default();
2379    if contact_accepted {
2380        update.remove_contacts.push(requester_id.to_proto());
2381    } else {
2382        update
2383            .remove_incoming_requests
2384            .push(requester_id.to_proto());
2385    }
2386    for connection_id in pool.user_connection_ids(responder_id) {
2387        session.peer.send(connection_id, update.clone())?;
2388        if let Some(notification_id) = deleted_notification_id {
2389            session.peer.send(
2390                connection_id,
2391                proto::DeleteNotification {
2392                    notification_id: notification_id.to_proto(),
2393                },
2394            )?;
2395        }
2396    }
2397
2398    response.send(proto::Ack {})?;
2399    Ok(())
2400}
2401
2402/// Creates a new channel.
2403async fn create_channel(
2404    request: proto::CreateChannel,
2405    response: Response<proto::CreateChannel>,
2406    session: Session,
2407) -> Result<()> {
2408    let db = session.db().await;
2409
2410    let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2411    let (channel, membership) = db
2412        .create_channel(&request.name, parent_id, session.user_id)
2413        .await?;
2414
2415    let root_id = channel.root_id();
2416    let channel = Channel::from_model(channel);
2417
2418    response.send(proto::CreateChannelResponse {
2419        channel: Some(channel.to_proto()),
2420        parent_id: request.parent_id,
2421    })?;
2422
2423    let mut connection_pool = session.connection_pool().await;
2424    if let Some(membership) = membership {
2425        connection_pool.subscribe_to_channel(
2426            membership.user_id,
2427            membership.channel_id,
2428            membership.role,
2429        );
2430        let update = proto::UpdateUserChannels {
2431            channel_memberships: vec![proto::ChannelMembership {
2432                channel_id: membership.channel_id.to_proto(),
2433                role: membership.role.into(),
2434            }],
2435            ..Default::default()
2436        };
2437        for connection_id in connection_pool.user_connection_ids(membership.user_id) {
2438            session.peer.send(connection_id, update.clone())?;
2439        }
2440    }
2441
2442    for (connection_id, role) in connection_pool.channel_connection_ids(root_id) {
2443        if !role.can_see_channel(channel.visibility) {
2444            continue;
2445        }
2446
2447        let update = proto::UpdateChannels {
2448            channels: vec![channel.to_proto()],
2449            ..Default::default()
2450        };
2451        session.peer.send(connection_id, update.clone())?;
2452    }
2453
2454    Ok(())
2455}
2456
2457/// Delete a channel
2458async fn delete_channel(
2459    request: proto::DeleteChannel,
2460    response: Response<proto::DeleteChannel>,
2461    session: Session,
2462) -> Result<()> {
2463    let db = session.db().await;
2464
2465    let channel_id = request.channel_id;
2466    let (root_channel, removed_channels) = db
2467        .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2468        .await?;
2469    response.send(proto::Ack {})?;
2470
2471    // Notify members of removed channels
2472    let mut update = proto::UpdateChannels::default();
2473    update
2474        .delete_channels
2475        .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2476
2477    let connection_pool = session.connection_pool().await;
2478    for (connection_id, _) in connection_pool.channel_connection_ids(root_channel) {
2479        session.peer.send(connection_id, update.clone())?;
2480    }
2481
2482    Ok(())
2483}
2484
2485/// Invite someone to join a channel.
2486async fn invite_channel_member(
2487    request: proto::InviteChannelMember,
2488    response: Response<proto::InviteChannelMember>,
2489    session: Session,
2490) -> Result<()> {
2491    let db = session.db().await;
2492    let channel_id = ChannelId::from_proto(request.channel_id);
2493    let invitee_id = UserId::from_proto(request.user_id);
2494    let InviteMemberResult {
2495        channel,
2496        notifications,
2497    } = db
2498        .invite_channel_member(
2499            channel_id,
2500            invitee_id,
2501            session.user_id,
2502            request.role().into(),
2503        )
2504        .await?;
2505
2506    let update = proto::UpdateChannels {
2507        channel_invitations: vec![channel.to_proto()],
2508        ..Default::default()
2509    };
2510
2511    let connection_pool = session.connection_pool().await;
2512    for connection_id in connection_pool.user_connection_ids(invitee_id) {
2513        session.peer.send(connection_id, update.clone())?;
2514    }
2515
2516    send_notifications(&connection_pool, &session.peer, notifications);
2517
2518    response.send(proto::Ack {})?;
2519    Ok(())
2520}
2521
2522/// remove someone from a channel
2523async fn remove_channel_member(
2524    request: proto::RemoveChannelMember,
2525    response: Response<proto::RemoveChannelMember>,
2526    session: Session,
2527) -> Result<()> {
2528    let db = session.db().await;
2529    let channel_id = ChannelId::from_proto(request.channel_id);
2530    let member_id = UserId::from_proto(request.user_id);
2531
2532    let RemoveChannelMemberResult {
2533        membership_update,
2534        notification_id,
2535    } = db
2536        .remove_channel_member(channel_id, member_id, session.user_id)
2537        .await?;
2538
2539    let mut connection_pool = session.connection_pool().await;
2540    notify_membership_updated(
2541        &mut connection_pool,
2542        membership_update,
2543        member_id,
2544        &session.peer,
2545    );
2546    for connection_id in connection_pool.user_connection_ids(member_id) {
2547        if let Some(notification_id) = notification_id {
2548            session
2549                .peer
2550                .send(
2551                    connection_id,
2552                    proto::DeleteNotification {
2553                        notification_id: notification_id.to_proto(),
2554                    },
2555                )
2556                .trace_err();
2557        }
2558    }
2559
2560    response.send(proto::Ack {})?;
2561    Ok(())
2562}
2563
2564/// Toggle the channel between public and private.
2565/// Care is taken to maintain the invariant that public channels only descend from public channels,
2566/// (though members-only channels can appear at any point in the hierarchy).
2567async fn set_channel_visibility(
2568    request: proto::SetChannelVisibility,
2569    response: Response<proto::SetChannelVisibility>,
2570    session: Session,
2571) -> Result<()> {
2572    let db = session.db().await;
2573    let channel_id = ChannelId::from_proto(request.channel_id);
2574    let visibility = request.visibility().into();
2575
2576    let channel_model = db
2577        .set_channel_visibility(channel_id, visibility, session.user_id)
2578        .await?;
2579    let root_id = channel_model.root_id();
2580    let channel = Channel::from_model(channel_model);
2581
2582    let mut connection_pool = session.connection_pool().await;
2583    for (user_id, role) in connection_pool
2584        .channel_user_ids(root_id)
2585        .collect::<Vec<_>>()
2586        .into_iter()
2587    {
2588        let update = if role.can_see_channel(channel.visibility) {
2589            connection_pool.subscribe_to_channel(user_id, channel_id, role);
2590            proto::UpdateChannels {
2591                channels: vec![channel.to_proto()],
2592                ..Default::default()
2593            }
2594        } else {
2595            connection_pool.unsubscribe_from_channel(&user_id, &channel_id);
2596            proto::UpdateChannels {
2597                delete_channels: vec![channel.id.to_proto()],
2598                ..Default::default()
2599            }
2600        };
2601
2602        for connection_id in connection_pool.user_connection_ids(user_id) {
2603            session.peer.send(connection_id, update.clone())?;
2604        }
2605    }
2606
2607    response.send(proto::Ack {})?;
2608    Ok(())
2609}
2610
2611/// Alter the role for a user in the channel.
2612async fn set_channel_member_role(
2613    request: proto::SetChannelMemberRole,
2614    response: Response<proto::SetChannelMemberRole>,
2615    session: Session,
2616) -> Result<()> {
2617    let db = session.db().await;
2618    let channel_id = ChannelId::from_proto(request.channel_id);
2619    let member_id = UserId::from_proto(request.user_id);
2620    let result = db
2621        .set_channel_member_role(
2622            channel_id,
2623            session.user_id,
2624            member_id,
2625            request.role().into(),
2626        )
2627        .await?;
2628
2629    match result {
2630        db::SetMemberRoleResult::MembershipUpdated(membership_update) => {
2631            let mut connection_pool = session.connection_pool().await;
2632            notify_membership_updated(
2633                &mut connection_pool,
2634                membership_update,
2635                member_id,
2636                &session.peer,
2637            )
2638        }
2639        db::SetMemberRoleResult::InviteUpdated(channel) => {
2640            let update = proto::UpdateChannels {
2641                channel_invitations: vec![channel.to_proto()],
2642                ..Default::default()
2643            };
2644
2645            for connection_id in session
2646                .connection_pool()
2647                .await
2648                .user_connection_ids(member_id)
2649            {
2650                session.peer.send(connection_id, update.clone())?;
2651            }
2652        }
2653    }
2654
2655    response.send(proto::Ack {})?;
2656    Ok(())
2657}
2658
2659/// Change the name of a channel
2660async fn rename_channel(
2661    request: proto::RenameChannel,
2662    response: Response<proto::RenameChannel>,
2663    session: Session,
2664) -> Result<()> {
2665    let db = session.db().await;
2666    let channel_id = ChannelId::from_proto(request.channel_id);
2667    let channel_model = db
2668        .rename_channel(channel_id, session.user_id, &request.name)
2669        .await?;
2670    let root_id = channel_model.root_id();
2671    let channel = Channel::from_model(channel_model);
2672
2673    response.send(proto::RenameChannelResponse {
2674        channel: Some(channel.to_proto()),
2675    })?;
2676
2677    let connection_pool = session.connection_pool().await;
2678    let update = proto::UpdateChannels {
2679        channels: vec![channel.to_proto()],
2680        ..Default::default()
2681    };
2682    for (connection_id, role) in connection_pool.channel_connection_ids(root_id) {
2683        if role.can_see_channel(channel.visibility) {
2684            session.peer.send(connection_id, update.clone())?;
2685        }
2686    }
2687
2688    Ok(())
2689}
2690
2691/// Move a channel to a new parent.
2692async fn move_channel(
2693    request: proto::MoveChannel,
2694    response: Response<proto::MoveChannel>,
2695    session: Session,
2696) -> Result<()> {
2697    let channel_id = ChannelId::from_proto(request.channel_id);
2698    let to = ChannelId::from_proto(request.to);
2699
2700    let (root_id, channels) = session
2701        .db()
2702        .await
2703        .move_channel(channel_id, to, session.user_id)
2704        .await?;
2705
2706    let connection_pool = session.connection_pool().await;
2707    for (connection_id, role) in connection_pool.channel_connection_ids(root_id) {
2708        let channels = channels
2709            .iter()
2710            .filter_map(|channel| {
2711                if role.can_see_channel(channel.visibility) {
2712                    Some(channel.to_proto())
2713                } else {
2714                    None
2715                }
2716            })
2717            .collect::<Vec<_>>();
2718        if channels.is_empty() {
2719            continue;
2720        }
2721
2722        let update = proto::UpdateChannels {
2723            channels,
2724            ..Default::default()
2725        };
2726
2727        session.peer.send(connection_id, update.clone())?;
2728    }
2729
2730    response.send(Ack {})?;
2731    Ok(())
2732}
2733
2734/// Get the list of channel members
2735async fn get_channel_members(
2736    request: proto::GetChannelMembers,
2737    response: Response<proto::GetChannelMembers>,
2738    session: Session,
2739) -> Result<()> {
2740    let db = session.db().await;
2741    let channel_id = ChannelId::from_proto(request.channel_id);
2742    let members = db
2743        .get_channel_participant_details(channel_id, session.user_id)
2744        .await?;
2745    response.send(proto::GetChannelMembersResponse { members })?;
2746    Ok(())
2747}
2748
2749/// Accept or decline a channel invitation.
2750async fn respond_to_channel_invite(
2751    request: proto::RespondToChannelInvite,
2752    response: Response<proto::RespondToChannelInvite>,
2753    session: Session,
2754) -> Result<()> {
2755    let db = session.db().await;
2756    let channel_id = ChannelId::from_proto(request.channel_id);
2757    let RespondToChannelInvite {
2758        membership_update,
2759        notifications,
2760    } = db
2761        .respond_to_channel_invite(channel_id, session.user_id, request.accept)
2762        .await?;
2763
2764    let mut connection_pool = session.connection_pool().await;
2765    if let Some(membership_update) = membership_update {
2766        notify_membership_updated(
2767            &mut connection_pool,
2768            membership_update,
2769            session.user_id,
2770            &session.peer,
2771        );
2772    } else {
2773        let update = proto::UpdateChannels {
2774            remove_channel_invitations: vec![channel_id.to_proto()],
2775            ..Default::default()
2776        };
2777
2778        for connection_id in connection_pool.user_connection_ids(session.user_id) {
2779            session.peer.send(connection_id, update.clone())?;
2780        }
2781    };
2782
2783    send_notifications(&connection_pool, &session.peer, notifications);
2784
2785    response.send(proto::Ack {})?;
2786
2787    Ok(())
2788}
2789
2790/// Join the channels' room
2791async fn join_channel(
2792    request: proto::JoinChannel,
2793    response: Response<proto::JoinChannel>,
2794    session: Session,
2795) -> Result<()> {
2796    let channel_id = ChannelId::from_proto(request.channel_id);
2797    join_channel_internal(channel_id, Box::new(response), session).await
2798}
2799
2800trait JoinChannelInternalResponse {
2801    fn send(self, result: proto::JoinRoomResponse) -> Result<()>;
2802}
2803impl JoinChannelInternalResponse for Response<proto::JoinChannel> {
2804    fn send(self, result: proto::JoinRoomResponse) -> Result<()> {
2805        Response::<proto::JoinChannel>::send(self, result)
2806    }
2807}
2808impl JoinChannelInternalResponse for Response<proto::JoinRoom> {
2809    fn send(self, result: proto::JoinRoomResponse) -> Result<()> {
2810        Response::<proto::JoinRoom>::send(self, result)
2811    }
2812}
2813
2814async fn join_channel_internal(
2815    channel_id: ChannelId,
2816    response: Box<impl JoinChannelInternalResponse>,
2817    session: Session,
2818) -> Result<()> {
2819    let joined_room = {
2820        leave_room_for_session(&session).await?;
2821        let db = session.db().await;
2822
2823        let (joined_room, membership_updated, role) = db
2824            .join_channel(channel_id, session.user_id, session.connection_id)
2825            .await?;
2826
2827        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2828            let (can_publish, token) = if role == ChannelRole::Guest {
2829                (
2830                    false,
2831                    live_kit
2832                        .guest_token(
2833                            &joined_room.room.live_kit_room,
2834                            &session.user_id.to_string(),
2835                        )
2836                        .trace_err()?,
2837                )
2838            } else {
2839                (
2840                    true,
2841                    live_kit
2842                        .room_token(
2843                            &joined_room.room.live_kit_room,
2844                            &session.user_id.to_string(),
2845                        )
2846                        .trace_err()?,
2847                )
2848            };
2849
2850            Some(LiveKitConnectionInfo {
2851                server_url: live_kit.url().into(),
2852                token,
2853                can_publish,
2854            })
2855        });
2856
2857        response.send(proto::JoinRoomResponse {
2858            room: Some(joined_room.room.clone()),
2859            channel_id: joined_room
2860                .channel
2861                .as_ref()
2862                .map(|channel| channel.id.to_proto()),
2863            live_kit_connection_info,
2864        })?;
2865
2866        let mut connection_pool = session.connection_pool().await;
2867        if let Some(membership_updated) = membership_updated {
2868            notify_membership_updated(
2869                &mut connection_pool,
2870                membership_updated,
2871                session.user_id,
2872                &session.peer,
2873            );
2874        }
2875
2876        room_updated(&joined_room.room, &session.peer);
2877
2878        joined_room
2879    };
2880
2881    channel_updated(
2882        &joined_room
2883            .channel
2884            .ok_or_else(|| anyhow!("channel not returned"))?,
2885        &joined_room.room,
2886        &session.peer,
2887        &*session.connection_pool().await,
2888    );
2889
2890    update_user_contacts(session.user_id, &session).await?;
2891    Ok(())
2892}
2893
2894/// Start editing the channel notes
2895async fn join_channel_buffer(
2896    request: proto::JoinChannelBuffer,
2897    response: Response<proto::JoinChannelBuffer>,
2898    session: Session,
2899) -> Result<()> {
2900    let db = session.db().await;
2901    let channel_id = ChannelId::from_proto(request.channel_id);
2902
2903    let open_response = db
2904        .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2905        .await?;
2906
2907    let collaborators = open_response.collaborators.clone();
2908    response.send(open_response)?;
2909
2910    let update = UpdateChannelBufferCollaborators {
2911        channel_id: channel_id.to_proto(),
2912        collaborators: collaborators.clone(),
2913    };
2914    channel_buffer_updated(
2915        session.connection_id,
2916        collaborators
2917            .iter()
2918            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2919        &update,
2920        &session.peer,
2921    );
2922
2923    Ok(())
2924}
2925
2926/// Edit the channel notes
2927async fn update_channel_buffer(
2928    request: proto::UpdateChannelBuffer,
2929    session: Session,
2930) -> Result<()> {
2931    let db = session.db().await;
2932    let channel_id = ChannelId::from_proto(request.channel_id);
2933
2934    let (collaborators, non_collaborators, epoch, version) = db
2935        .update_channel_buffer(channel_id, session.user_id, &request.operations)
2936        .await?;
2937
2938    channel_buffer_updated(
2939        session.connection_id,
2940        collaborators,
2941        &proto::UpdateChannelBuffer {
2942            channel_id: channel_id.to_proto(),
2943            operations: request.operations,
2944        },
2945        &session.peer,
2946    );
2947
2948    let pool = &*session.connection_pool().await;
2949
2950    broadcast(
2951        None,
2952        non_collaborators
2953            .iter()
2954            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
2955        |peer_id| {
2956            session.peer.send(
2957                peer_id,
2958                proto::UpdateChannels {
2959                    latest_channel_buffer_versions: vec![proto::ChannelBufferVersion {
2960                        channel_id: channel_id.to_proto(),
2961                        epoch: epoch as u64,
2962                        version: version.clone(),
2963                    }],
2964                    ..Default::default()
2965                },
2966            )
2967        },
2968    );
2969
2970    Ok(())
2971}
2972
2973/// Rejoin the channel notes after a connection blip
2974async fn rejoin_channel_buffers(
2975    request: proto::RejoinChannelBuffers,
2976    response: Response<proto::RejoinChannelBuffers>,
2977    session: Session,
2978) -> Result<()> {
2979    let db = session.db().await;
2980    let buffers = db
2981        .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2982        .await?;
2983
2984    for rejoined_buffer in &buffers {
2985        let collaborators_to_notify = rejoined_buffer
2986            .buffer
2987            .collaborators
2988            .iter()
2989            .filter_map(|c| Some(c.peer_id?.into()));
2990        channel_buffer_updated(
2991            session.connection_id,
2992            collaborators_to_notify,
2993            &proto::UpdateChannelBufferCollaborators {
2994                channel_id: rejoined_buffer.buffer.channel_id,
2995                collaborators: rejoined_buffer.buffer.collaborators.clone(),
2996            },
2997            &session.peer,
2998        );
2999    }
3000
3001    response.send(proto::RejoinChannelBuffersResponse {
3002        buffers: buffers.into_iter().map(|b| b.buffer).collect(),
3003    })?;
3004
3005    Ok(())
3006}
3007
3008/// Stop editing the channel notes
3009async fn leave_channel_buffer(
3010    request: proto::LeaveChannelBuffer,
3011    response: Response<proto::LeaveChannelBuffer>,
3012    session: Session,
3013) -> Result<()> {
3014    let db = session.db().await;
3015    let channel_id = ChannelId::from_proto(request.channel_id);
3016
3017    let left_buffer = db
3018        .leave_channel_buffer(channel_id, session.connection_id)
3019        .await?;
3020
3021    response.send(Ack {})?;
3022
3023    channel_buffer_updated(
3024        session.connection_id,
3025        left_buffer.connections,
3026        &proto::UpdateChannelBufferCollaborators {
3027            channel_id: channel_id.to_proto(),
3028            collaborators: left_buffer.collaborators,
3029        },
3030        &session.peer,
3031    );
3032
3033    Ok(())
3034}
3035
3036fn channel_buffer_updated<T: EnvelopedMessage>(
3037    sender_id: ConnectionId,
3038    collaborators: impl IntoIterator<Item = ConnectionId>,
3039    message: &T,
3040    peer: &Peer,
3041) {
3042    broadcast(Some(sender_id), collaborators, |peer_id| {
3043        peer.send(peer_id, message.clone())
3044    });
3045}
3046
3047fn send_notifications(
3048    connection_pool: &ConnectionPool,
3049    peer: &Peer,
3050    notifications: db::NotificationBatch,
3051) {
3052    for (user_id, notification) in notifications {
3053        for connection_id in connection_pool.user_connection_ids(user_id) {
3054            if let Err(error) = peer.send(
3055                connection_id,
3056                proto::AddNotification {
3057                    notification: Some(notification.clone()),
3058                },
3059            ) {
3060                tracing::error!(
3061                    "failed to send notification to {:?} {}",
3062                    connection_id,
3063                    error
3064                );
3065            }
3066        }
3067    }
3068}
3069
3070/// Send a message to the channel
3071async fn send_channel_message(
3072    request: proto::SendChannelMessage,
3073    response: Response<proto::SendChannelMessage>,
3074    session: Session,
3075) -> Result<()> {
3076    // Validate the message body.
3077    let body = request.body.trim().to_string();
3078    if body.len() > MAX_MESSAGE_LEN {
3079        return Err(anyhow!("message is too long"))?;
3080    }
3081    if body.is_empty() {
3082        return Err(anyhow!("message can't be blank"))?;
3083    }
3084
3085    // TODO: adjust mentions if body is trimmed
3086
3087    let timestamp = OffsetDateTime::now_utc();
3088    let nonce = request
3089        .nonce
3090        .ok_or_else(|| anyhow!("nonce can't be blank"))?;
3091
3092    let channel_id = ChannelId::from_proto(request.channel_id);
3093    let CreatedChannelMessage {
3094        message_id,
3095        participant_connection_ids,
3096        channel_members,
3097        notifications,
3098    } = session
3099        .db()
3100        .await
3101        .create_channel_message(
3102            channel_id,
3103            session.user_id,
3104            &body,
3105            &request.mentions,
3106            timestamp,
3107            nonce.clone().into(),
3108            match request.reply_to_message_id {
3109                Some(reply_to_message_id) => Some(MessageId::from_proto(reply_to_message_id)),
3110                None => None,
3111            },
3112        )
3113        .await?;
3114    let message = proto::ChannelMessage {
3115        sender_id: session.user_id.to_proto(),
3116        id: message_id.to_proto(),
3117        body,
3118        mentions: request.mentions,
3119        timestamp: timestamp.unix_timestamp() as u64,
3120        nonce: Some(nonce),
3121        reply_to_message_id: request.reply_to_message_id,
3122    };
3123    broadcast(
3124        Some(session.connection_id),
3125        participant_connection_ids,
3126        |connection| {
3127            session.peer.send(
3128                connection,
3129                proto::ChannelMessageSent {
3130                    channel_id: channel_id.to_proto(),
3131                    message: Some(message.clone()),
3132                },
3133            )
3134        },
3135    );
3136    response.send(proto::SendChannelMessageResponse {
3137        message: Some(message),
3138    })?;
3139
3140    let pool = &*session.connection_pool().await;
3141    broadcast(
3142        None,
3143        channel_members
3144            .iter()
3145            .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3146        |peer_id| {
3147            session.peer.send(
3148                peer_id,
3149                proto::UpdateChannels {
3150                    latest_channel_message_ids: vec![proto::ChannelMessageId {
3151                        channel_id: channel_id.to_proto(),
3152                        message_id: message_id.to_proto(),
3153                    }],
3154                    ..Default::default()
3155                },
3156            )
3157        },
3158    );
3159    send_notifications(pool, &session.peer, notifications);
3160
3161    Ok(())
3162}
3163
3164/// Delete a channel message
3165async fn remove_channel_message(
3166    request: proto::RemoveChannelMessage,
3167    response: Response<proto::RemoveChannelMessage>,
3168    session: Session,
3169) -> Result<()> {
3170    let channel_id = ChannelId::from_proto(request.channel_id);
3171    let message_id = MessageId::from_proto(request.message_id);
3172    let connection_ids = session
3173        .db()
3174        .await
3175        .remove_channel_message(channel_id, message_id, session.user_id)
3176        .await?;
3177    broadcast(Some(session.connection_id), connection_ids, |connection| {
3178        session.peer.send(connection, request.clone())
3179    });
3180    response.send(proto::Ack {})?;
3181    Ok(())
3182}
3183
3184/// Mark a channel message as read
3185async fn acknowledge_channel_message(
3186    request: proto::AckChannelMessage,
3187    session: Session,
3188) -> Result<()> {
3189    let channel_id = ChannelId::from_proto(request.channel_id);
3190    let message_id = MessageId::from_proto(request.message_id);
3191    let notifications = session
3192        .db()
3193        .await
3194        .observe_channel_message(channel_id, session.user_id, message_id)
3195        .await?;
3196    send_notifications(
3197        &*session.connection_pool().await,
3198        &session.peer,
3199        notifications,
3200    );
3201    Ok(())
3202}
3203
3204/// Mark a buffer version as synced
3205async fn acknowledge_buffer_version(
3206    request: proto::AckBufferOperation,
3207    session: Session,
3208) -> Result<()> {
3209    let buffer_id = BufferId::from_proto(request.buffer_id);
3210    session
3211        .db()
3212        .await
3213        .observe_buffer_version(
3214            buffer_id,
3215            session.user_id,
3216            request.epoch as i32,
3217            &request.version,
3218        )
3219        .await?;
3220    Ok(())
3221}
3222
3223/// Start receiving chat updates for a channel
3224async fn join_channel_chat(
3225    request: proto::JoinChannelChat,
3226    response: Response<proto::JoinChannelChat>,
3227    session: Session,
3228) -> Result<()> {
3229    let channel_id = ChannelId::from_proto(request.channel_id);
3230
3231    let db = session.db().await;
3232    db.join_channel_chat(channel_id, session.connection_id, session.user_id)
3233        .await?;
3234    let messages = db
3235        .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
3236        .await?;
3237    response.send(proto::JoinChannelChatResponse {
3238        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3239        messages,
3240    })?;
3241    Ok(())
3242}
3243
3244/// Stop receiving chat updates for a channel
3245async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
3246    let channel_id = ChannelId::from_proto(request.channel_id);
3247    session
3248        .db()
3249        .await
3250        .leave_channel_chat(channel_id, session.connection_id, session.user_id)
3251        .await?;
3252    Ok(())
3253}
3254
3255/// Retrieve the chat history for a channel
3256async fn get_channel_messages(
3257    request: proto::GetChannelMessages,
3258    response: Response<proto::GetChannelMessages>,
3259    session: Session,
3260) -> Result<()> {
3261    let channel_id = ChannelId::from_proto(request.channel_id);
3262    let messages = session
3263        .db()
3264        .await
3265        .get_channel_messages(
3266            channel_id,
3267            session.user_id,
3268            MESSAGE_COUNT_PER_PAGE,
3269            Some(MessageId::from_proto(request.before_message_id)),
3270        )
3271        .await?;
3272    response.send(proto::GetChannelMessagesResponse {
3273        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3274        messages,
3275    })?;
3276    Ok(())
3277}
3278
3279/// Retrieve specific chat messages
3280async fn get_channel_messages_by_id(
3281    request: proto::GetChannelMessagesById,
3282    response: Response<proto::GetChannelMessagesById>,
3283    session: Session,
3284) -> Result<()> {
3285    let message_ids = request
3286        .message_ids
3287        .iter()
3288        .map(|id| MessageId::from_proto(*id))
3289        .collect::<Vec<_>>();
3290    let messages = session
3291        .db()
3292        .await
3293        .get_channel_messages_by_id(session.user_id, &message_ids)
3294        .await?;
3295    response.send(proto::GetChannelMessagesResponse {
3296        done: messages.len() < MESSAGE_COUNT_PER_PAGE,
3297        messages,
3298    })?;
3299    Ok(())
3300}
3301
3302/// Retrieve the current users notifications
3303async fn get_notifications(
3304    request: proto::GetNotifications,
3305    response: Response<proto::GetNotifications>,
3306    session: Session,
3307) -> Result<()> {
3308    let notifications = session
3309        .db()
3310        .await
3311        .get_notifications(
3312            session.user_id,
3313            NOTIFICATION_COUNT_PER_PAGE,
3314            request
3315                .before_id
3316                .map(|id| db::NotificationId::from_proto(id)),
3317        )
3318        .await?;
3319    response.send(proto::GetNotificationsResponse {
3320        done: notifications.len() < NOTIFICATION_COUNT_PER_PAGE,
3321        notifications,
3322    })?;
3323    Ok(())
3324}
3325
3326/// Mark notifications as read
3327async fn mark_notification_as_read(
3328    request: proto::MarkNotificationRead,
3329    response: Response<proto::MarkNotificationRead>,
3330    session: Session,
3331) -> Result<()> {
3332    let database = &session.db().await;
3333    let notifications = database
3334        .mark_notification_as_read_by_id(
3335            session.user_id,
3336            NotificationId::from_proto(request.notification_id),
3337        )
3338        .await?;
3339    send_notifications(
3340        &*session.connection_pool().await,
3341        &session.peer,
3342        notifications,
3343    );
3344    response.send(proto::Ack {})?;
3345    Ok(())
3346}
3347
3348/// Get the current users information
3349async fn get_private_user_info(
3350    _request: proto::GetPrivateUserInfo,
3351    response: Response<proto::GetPrivateUserInfo>,
3352    session: Session,
3353) -> Result<()> {
3354    let db = session.db().await;
3355
3356    let metrics_id = db.get_user_metrics_id(session.user_id).await?;
3357    let user = db
3358        .get_user_by_id(session.user_id)
3359        .await?
3360        .ok_or_else(|| anyhow!("user not found"))?;
3361    let flags = db.get_user_flags(session.user_id).await?;
3362
3363    response.send(proto::GetPrivateUserInfoResponse {
3364        metrics_id,
3365        staff: user.admin,
3366        flags,
3367    })?;
3368    Ok(())
3369}
3370
3371fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
3372    match message {
3373        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
3374        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
3375        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
3376        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
3377        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
3378            code: frame.code.into(),
3379            reason: frame.reason,
3380        })),
3381    }
3382}
3383
3384fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
3385    match message {
3386        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
3387        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
3388        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
3389        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
3390        AxumMessage::Close(frame) => {
3391            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
3392                code: frame.code.into(),
3393                reason: frame.reason,
3394            }))
3395        }
3396    }
3397}
3398
3399fn notify_membership_updated(
3400    connection_pool: &mut ConnectionPool,
3401    result: MembershipUpdated,
3402    user_id: UserId,
3403    peer: &Peer,
3404) {
3405    for membership in &result.new_channels.channel_memberships {
3406        connection_pool.subscribe_to_channel(user_id, membership.channel_id, membership.role)
3407    }
3408    for channel_id in &result.removed_channels {
3409        connection_pool.unsubscribe_from_channel(&user_id, channel_id)
3410    }
3411
3412    let user_channels_update = proto::UpdateUserChannels {
3413        channel_memberships: result
3414            .new_channels
3415            .channel_memberships
3416            .iter()
3417            .map(|cm| proto::ChannelMembership {
3418                channel_id: cm.channel_id.to_proto(),
3419                role: cm.role.into(),
3420            })
3421            .collect(),
3422        ..Default::default()
3423    };
3424
3425    let mut update = build_channels_update(result.new_channels, vec![]);
3426    update.delete_channels = result
3427        .removed_channels
3428        .into_iter()
3429        .map(|id| id.to_proto())
3430        .collect();
3431    update.remove_channel_invitations = vec![result.channel_id.to_proto()];
3432
3433    for connection_id in connection_pool.user_connection_ids(user_id) {
3434        peer.send(connection_id, user_channels_update.clone())
3435            .trace_err();
3436        peer.send(connection_id, update.clone()).trace_err();
3437    }
3438}
3439
3440fn build_update_user_channels(channels: &ChannelsForUser) -> proto::UpdateUserChannels {
3441    proto::UpdateUserChannels {
3442        channel_memberships: channels
3443            .channel_memberships
3444            .iter()
3445            .map(|m| proto::ChannelMembership {
3446                channel_id: m.channel_id.to_proto(),
3447                role: m.role.into(),
3448            })
3449            .collect(),
3450        observed_channel_buffer_version: channels.observed_buffer_versions.clone(),
3451        observed_channel_message_id: channels.observed_channel_messages.clone(),
3452    }
3453}
3454
3455fn build_channels_update(
3456    channels: ChannelsForUser,
3457    channel_invites: Vec<db::Channel>,
3458) -> proto::UpdateChannels {
3459    let mut update = proto::UpdateChannels::default();
3460
3461    for channel in channels.channels {
3462        update.channels.push(channel.to_proto());
3463    }
3464
3465    update.latest_channel_buffer_versions = channels.latest_buffer_versions;
3466    update.latest_channel_message_ids = channels.latest_channel_messages;
3467
3468    for (channel_id, participants) in channels.channel_participants {
3469        update
3470            .channel_participants
3471            .push(proto::ChannelParticipants {
3472                channel_id: channel_id.to_proto(),
3473                participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
3474            });
3475    }
3476
3477    for channel in channel_invites {
3478        update.channel_invitations.push(channel.to_proto());
3479    }
3480    for project in channels.hosted_projects {
3481        update.hosted_projects.push(project);
3482    }
3483
3484    update
3485}
3486
3487fn build_initial_contacts_update(
3488    contacts: Vec<db::Contact>,
3489    pool: &ConnectionPool,
3490) -> proto::UpdateContacts {
3491    let mut update = proto::UpdateContacts::default();
3492
3493    for contact in contacts {
3494        match contact {
3495            db::Contact::Accepted { user_id, busy } => {
3496                update.contacts.push(contact_for_user(user_id, busy, &pool));
3497            }
3498            db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3499            db::Contact::Incoming { user_id } => {
3500                update
3501                    .incoming_requests
3502                    .push(proto::IncomingContactRequest {
3503                        requester_id: user_id.to_proto(),
3504                    })
3505            }
3506        }
3507    }
3508
3509    update
3510}
3511
3512fn contact_for_user(user_id: UserId, busy: bool, pool: &ConnectionPool) -> proto::Contact {
3513    proto::Contact {
3514        user_id: user_id.to_proto(),
3515        online: pool.is_user_online(user_id),
3516        busy,
3517    }
3518}
3519
3520fn room_updated(room: &proto::Room, peer: &Peer) {
3521    broadcast(
3522        None,
3523        room.participants
3524            .iter()
3525            .filter_map(|participant| Some(participant.peer_id?.into())),
3526        |peer_id| {
3527            peer.send(
3528                peer_id,
3529                proto::RoomUpdated {
3530                    room: Some(room.clone()),
3531                },
3532            )
3533        },
3534    );
3535}
3536
3537fn channel_updated(
3538    channel: &db::channel::Model,
3539    room: &proto::Room,
3540    peer: &Peer,
3541    pool: &ConnectionPool,
3542) {
3543    let participants = room
3544        .participants
3545        .iter()
3546        .map(|p| p.user_id)
3547        .collect::<Vec<_>>();
3548
3549    broadcast(
3550        None,
3551        pool.channel_connection_ids(channel.root_id())
3552            .filter_map(|(channel_id, role)| {
3553                role.can_see_channel(channel.visibility).then(|| channel_id)
3554            }),
3555        |peer_id| {
3556            peer.send(
3557                peer_id,
3558                proto::UpdateChannels {
3559                    channel_participants: vec![proto::ChannelParticipants {
3560                        channel_id: channel.id.to_proto(),
3561                        participant_user_ids: participants.clone(),
3562                    }],
3563                    ..Default::default()
3564                },
3565            )
3566        },
3567    );
3568}
3569
3570async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3571    let db = session.db().await;
3572
3573    let contacts = db.get_contacts(user_id).await?;
3574    let busy = db.is_user_busy(user_id).await?;
3575
3576    let pool = session.connection_pool().await;
3577    let updated_contact = contact_for_user(user_id, busy, &pool);
3578    for contact in contacts {
3579        if let db::Contact::Accepted {
3580            user_id: contact_user_id,
3581            ..
3582        } = contact
3583        {
3584            for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3585                session
3586                    .peer
3587                    .send(
3588                        contact_conn_id,
3589                        proto::UpdateContacts {
3590                            contacts: vec![updated_contact.clone()],
3591                            remove_contacts: Default::default(),
3592                            incoming_requests: Default::default(),
3593                            remove_incoming_requests: Default::default(),
3594                            outgoing_requests: Default::default(),
3595                            remove_outgoing_requests: Default::default(),
3596                        },
3597                    )
3598                    .trace_err();
3599            }
3600        }
3601    }
3602    Ok(())
3603}
3604
3605async fn leave_room_for_session(session: &Session) -> Result<()> {
3606    let mut contacts_to_update = HashSet::default();
3607
3608    let room_id;
3609    let canceled_calls_to_user_ids;
3610    let live_kit_room;
3611    let delete_live_kit_room;
3612    let room;
3613    let channel;
3614
3615    if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3616        contacts_to_update.insert(session.user_id);
3617
3618        for project in left_room.left_projects.values() {
3619            project_left(project, session);
3620        }
3621
3622        room_id = RoomId::from_proto(left_room.room.id);
3623        canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3624        live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3625        delete_live_kit_room = left_room.deleted;
3626        room = mem::take(&mut left_room.room);
3627        channel = mem::take(&mut left_room.channel);
3628
3629        room_updated(&room, &session.peer);
3630    } else {
3631        return Ok(());
3632    }
3633
3634    if let Some(channel) = channel {
3635        channel_updated(
3636            &channel,
3637            &room,
3638            &session.peer,
3639            &*session.connection_pool().await,
3640        );
3641    }
3642
3643    {
3644        let pool = session.connection_pool().await;
3645        for canceled_user_id in canceled_calls_to_user_ids {
3646            for connection_id in pool.user_connection_ids(canceled_user_id) {
3647                session
3648                    .peer
3649                    .send(
3650                        connection_id,
3651                        proto::CallCanceled {
3652                            room_id: room_id.to_proto(),
3653                        },
3654                    )
3655                    .trace_err();
3656            }
3657            contacts_to_update.insert(canceled_user_id);
3658        }
3659    }
3660
3661    for contact_user_id in contacts_to_update {
3662        update_user_contacts(contact_user_id, &session).await?;
3663    }
3664
3665    if let Some(live_kit) = session.live_kit_client.as_ref() {
3666        live_kit
3667            .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3668            .await
3669            .trace_err();
3670
3671        if delete_live_kit_room {
3672            live_kit.delete_room(live_kit_room).await.trace_err();
3673        }
3674    }
3675
3676    Ok(())
3677}
3678
3679async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3680    let left_channel_buffers = session
3681        .db()
3682        .await
3683        .leave_channel_buffers(session.connection_id)
3684        .await?;
3685
3686    for left_buffer in left_channel_buffers {
3687        channel_buffer_updated(
3688            session.connection_id,
3689            left_buffer.connections,
3690            &proto::UpdateChannelBufferCollaborators {
3691                channel_id: left_buffer.channel_id.to_proto(),
3692                collaborators: left_buffer.collaborators,
3693            },
3694            &session.peer,
3695        );
3696    }
3697
3698    Ok(())
3699}
3700
3701fn project_left(project: &db::LeftProject, session: &Session) {
3702    for connection_id in &project.connection_ids {
3703        if project.host_user_id == Some(session.user_id) {
3704            session
3705                .peer
3706                .send(
3707                    *connection_id,
3708                    proto::UnshareProject {
3709                        project_id: project.id.to_proto(),
3710                    },
3711                )
3712                .trace_err();
3713        } else {
3714            session
3715                .peer
3716                .send(
3717                    *connection_id,
3718                    proto::RemoveProjectCollaborator {
3719                        project_id: project.id.to_proto(),
3720                        peer_id: Some(session.connection_id.into()),
3721                    },
3722                )
3723                .trace_err();
3724        }
3725    }
3726}
3727
3728pub trait ResultExt {
3729    type Ok;
3730
3731    fn trace_err(self) -> Option<Self::Ok>;
3732}
3733
3734impl<T, E> ResultExt for Result<T, E>
3735where
3736    E: std::fmt::Debug,
3737{
3738    type Ok = T;
3739
3740    fn trace_err(self) -> Option<T> {
3741        match self {
3742            Ok(value) => Some(value),
3743            Err(error) => {
3744                tracing::error!("{:?}", error);
3745                None
3746            }
3747        }
3748    }
3749}