1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AnyTypedEnvelope, ChannelEdge, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage, UpdateChannelBufferCollaborators,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(delete_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(remove_channel_message)
269 .add_request_handler(get_channel_messages)
270 .add_request_handler(link_channel)
271 .add_request_handler(unlink_channel)
272 .add_request_handler(move_channel)
273 .add_request_handler(follow)
274 .add_message_handler(unfollow)
275 .add_message_handler(update_followers)
276 .add_message_handler(update_diff_base)
277 .add_request_handler(get_private_user_info);
278
279 Arc::new(server)
280 }
281
282 pub async fn start(&self) -> Result<()> {
283 let server_id = *self.id.lock();
284 let app_state = self.app_state.clone();
285 let peer = self.peer.clone();
286 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
287 let pool = self.connection_pool.clone();
288 let live_kit_client = self.app_state.live_kit_client.clone();
289
290 let span = info_span!("start server");
291 self.executor.spawn_detached(
292 async move {
293 tracing::info!("waiting for cleanup timeout");
294 timeout.await;
295 tracing::info!("cleanup timeout expired, retrieving stale rooms");
296 if let Some((room_ids, channel_ids)) = app_state
297 .db
298 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
299 .await
300 .trace_err()
301 {
302 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
303 tracing::info!(
304 stale_channel_buffer_count = channel_ids.len(),
305 "retrieved stale channel buffers"
306 );
307
308 for channel_id in channel_ids {
309 if let Some(refreshed_channel_buffer) = app_state
310 .db
311 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
312 .await
313 .trace_err()
314 {
315 for connection_id in refreshed_channel_buffer.connection_ids {
316 peer.send(
317 connection_id,
318 proto::UpdateChannelBufferCollaborators {
319 channel_id: channel_id.to_proto(),
320 collaborators: refreshed_channel_buffer
321 .collaborators
322 .clone(),
323 },
324 )
325 .trace_err();
326 }
327 }
328 }
329
330 for room_id in room_ids {
331 let mut contacts_to_update = HashSet::default();
332 let mut canceled_calls_to_user_ids = Vec::new();
333 let mut live_kit_room = String::new();
334 let mut delete_live_kit_room = false;
335
336 if let Some(mut refreshed_room) = app_state
337 .db
338 .clear_stale_room_participants(room_id, server_id)
339 .await
340 .trace_err()
341 {
342 tracing::info!(
343 room_id = room_id.0,
344 new_participant_count = refreshed_room.room.participants.len(),
345 "refreshed room"
346 );
347 room_updated(&refreshed_room.room, &peer);
348 if let Some(channel_id) = refreshed_room.channel_id {
349 channel_updated(
350 channel_id,
351 &refreshed_room.room,
352 &refreshed_room.channel_members,
353 &peer,
354 &*pool.lock(),
355 );
356 }
357 contacts_to_update
358 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
359 contacts_to_update
360 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
361 canceled_calls_to_user_ids =
362 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
363 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
364 delete_live_kit_room = refreshed_room.room.participants.is_empty();
365 }
366
367 {
368 let pool = pool.lock();
369 for canceled_user_id in canceled_calls_to_user_ids {
370 for connection_id in pool.user_connection_ids(canceled_user_id) {
371 peer.send(
372 connection_id,
373 proto::CallCanceled {
374 room_id: room_id.to_proto(),
375 },
376 )
377 .trace_err();
378 }
379 }
380 }
381
382 for user_id in contacts_to_update {
383 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
384 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
385 if let Some((busy, contacts)) = busy.zip(contacts) {
386 let pool = pool.lock();
387 let updated_contact = contact_for_user(user_id, false, busy, &pool);
388 for contact in contacts {
389 if let db::Contact::Accepted {
390 user_id: contact_user_id,
391 ..
392 } = contact
393 {
394 for contact_conn_id in
395 pool.user_connection_ids(contact_user_id)
396 {
397 peer.send(
398 contact_conn_id,
399 proto::UpdateContacts {
400 contacts: vec![updated_contact.clone()],
401 remove_contacts: Default::default(),
402 incoming_requests: Default::default(),
403 remove_incoming_requests: Default::default(),
404 outgoing_requests: Default::default(),
405 remove_outgoing_requests: Default::default(),
406 },
407 )
408 .trace_err();
409 }
410 }
411 }
412 }
413 }
414
415 if let Some(live_kit) = live_kit_client.as_ref() {
416 if delete_live_kit_room {
417 live_kit.delete_room(live_kit_room).await.trace_err();
418 }
419 }
420 }
421 }
422
423 app_state
424 .db
425 .delete_stale_servers(&app_state.config.zed_environment, server_id)
426 .await
427 .trace_err();
428 }
429 .instrument(span),
430 );
431 Ok(())
432 }
433
434 pub fn teardown(&self) {
435 self.peer.teardown();
436 self.connection_pool.lock().reset();
437 let _ = self.teardown.send(());
438 }
439
440 #[cfg(test)]
441 pub fn reset(&self, id: ServerId) {
442 self.teardown();
443 *self.id.lock() = id;
444 self.peer.reset(id.0 as u32);
445 }
446
447 #[cfg(test)]
448 pub fn id(&self) -> ServerId {
449 *self.id.lock()
450 }
451
452 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
453 where
454 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
455 Fut: 'static + Send + Future<Output = Result<()>>,
456 M: EnvelopedMessage,
457 {
458 let prev_handler = self.handlers.insert(
459 TypeId::of::<M>(),
460 Box::new(move |envelope, session| {
461 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
462 let span = info_span!(
463 "handle message",
464 payload_type = envelope.payload_type_name()
465 );
466 span.in_scope(|| {
467 tracing::info!(
468 payload_type = envelope.payload_type_name(),
469 "message received"
470 );
471 });
472 let start_time = Instant::now();
473 let future = (handler)(*envelope, session);
474 async move {
475 let result = future.await;
476 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
477 match result {
478 Err(error) => {
479 tracing::error!(%error, ?duration_ms, "error handling message")
480 }
481 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
482 }
483 }
484 .instrument(span)
485 .boxed()
486 }),
487 );
488 if prev_handler.is_some() {
489 panic!("registered a handler for the same message twice");
490 }
491 self
492 }
493
494 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
495 where
496 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
497 Fut: 'static + Send + Future<Output = Result<()>>,
498 M: EnvelopedMessage,
499 {
500 self.add_handler(move |envelope, session| handler(envelope.payload, session));
501 self
502 }
503
504 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
505 where
506 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
507 Fut: Send + Future<Output = Result<()>>,
508 M: RequestMessage,
509 {
510 let handler = Arc::new(handler);
511 self.add_handler(move |envelope, session| {
512 let receipt = envelope.receipt();
513 let handler = handler.clone();
514 async move {
515 let peer = session.peer.clone();
516 let responded = Arc::new(AtomicBool::default());
517 let response = Response {
518 peer: peer.clone(),
519 responded: responded.clone(),
520 receipt,
521 };
522 match (handler)(envelope.payload, response, session).await {
523 Ok(()) => {
524 if responded.load(std::sync::atomic::Ordering::SeqCst) {
525 Ok(())
526 } else {
527 Err(anyhow!("handler did not send a response"))?
528 }
529 }
530 Err(error) => {
531 peer.respond_with_error(
532 receipt,
533 proto::Error {
534 message: error.to_string(),
535 },
536 )?;
537 Err(error)
538 }
539 }
540 }
541 })
542 }
543
544 pub fn handle_connection(
545 self: &Arc<Self>,
546 connection: Connection,
547 address: String,
548 user: User,
549 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
550 executor: Executor,
551 ) -> impl Future<Output = Result<()>> {
552 let this = self.clone();
553 let user_id = user.id;
554 let login = user.github_login;
555 let span = info_span!("handle connection", %user_id, %login, %address);
556 let mut teardown = self.teardown.subscribe();
557 async move {
558 let (connection_id, handle_io, mut incoming_rx) = this
559 .peer
560 .add_connection(connection, {
561 let executor = executor.clone();
562 move |duration| executor.sleep(duration)
563 });
564
565 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
566 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
567 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
568
569 if let Some(send_connection_id) = send_connection_id.take() {
570 let _ = send_connection_id.send(connection_id);
571 }
572
573 if !user.connected_once {
574 this.peer.send(connection_id, proto::ShowContacts {})?;
575 this.app_state.db.set_user_connected_once(user_id, true).await?;
576 }
577
578 let (contacts, channels_for_user, channel_invites) = future::try_join3(
579 this.app_state.db.get_contacts(user_id),
580 this.app_state.db.get_channels_for_user(user_id),
581 this.app_state.db.get_channel_invites_for_user(user_id)
582 ).await?;
583
584 {
585 let mut pool = this.connection_pool.lock();
586 pool.add_connection(connection_id, user_id, user.admin);
587 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
588 this.peer.send(connection_id, build_initial_channels_update(
589 channels_for_user,
590 channel_invites
591 ))?;
592 }
593
594 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
595 this.peer.send(connection_id, incoming_call)?;
596 }
597
598 let session = Session {
599 user_id,
600 connection_id,
601 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
602 peer: this.peer.clone(),
603 connection_pool: this.connection_pool.clone(),
604 live_kit_client: this.app_state.live_kit_client.clone(),
605 executor: executor.clone(),
606 };
607 update_user_contacts(user_id, &session).await?;
608
609 let handle_io = handle_io.fuse();
610 futures::pin_mut!(handle_io);
611
612 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
613 // This prevents deadlocks when e.g., client A performs a request to client B and
614 // client B performs a request to client A. If both clients stop processing further
615 // messages until their respective request completes, they won't have a chance to
616 // respond to the other client's request and cause a deadlock.
617 //
618 // This arrangement ensures we will attempt to process earlier messages first, but fall
619 // back to processing messages arrived later in the spirit of making progress.
620 let mut foreground_message_handlers = FuturesUnordered::new();
621 let concurrent_handlers = Arc::new(Semaphore::new(256));
622 loop {
623 let next_message = async {
624 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
625 let message = incoming_rx.next().await;
626 (permit, message)
627 }.fuse();
628 futures::pin_mut!(next_message);
629 futures::select_biased! {
630 _ = teardown.changed().fuse() => return Ok(()),
631 result = handle_io => {
632 if let Err(error) = result {
633 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
634 }
635 break;
636 }
637 _ = foreground_message_handlers.next() => {}
638 next_message = next_message => {
639 let (permit, message) = next_message;
640 if let Some(message) = message {
641 let type_name = message.payload_type_name();
642 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
643 let span_enter = span.enter();
644 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
645 let is_background = message.is_background();
646 let handle_message = (handler)(message, session.clone());
647 drop(span_enter);
648
649 let handle_message = async move {
650 handle_message.await;
651 drop(permit);
652 }.instrument(span);
653 if is_background {
654 executor.spawn_detached(handle_message);
655 } else {
656 foreground_message_handlers.push(handle_message);
657 }
658 } else {
659 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
660 }
661 } else {
662 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
663 break;
664 }
665 }
666 }
667 }
668
669 drop(foreground_message_handlers);
670 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
671 if let Err(error) = connection_lost(session, teardown, executor).await {
672 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
673 }
674
675 Ok(())
676 }.instrument(span)
677 }
678
679 pub async fn invite_code_redeemed(
680 self: &Arc<Self>,
681 inviter_id: UserId,
682 invitee_id: UserId,
683 ) -> Result<()> {
684 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
685 if let Some(code) = &user.invite_code {
686 let pool = self.connection_pool.lock();
687 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
688 for connection_id in pool.user_connection_ids(inviter_id) {
689 self.peer.send(
690 connection_id,
691 proto::UpdateContacts {
692 contacts: vec![invitee_contact.clone()],
693 ..Default::default()
694 },
695 )?;
696 self.peer.send(
697 connection_id,
698 proto::UpdateInviteInfo {
699 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
700 count: user.invite_count as u32,
701 },
702 )?;
703 }
704 }
705 }
706 Ok(())
707 }
708
709 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
710 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
711 if let Some(invite_code) = &user.invite_code {
712 let pool = self.connection_pool.lock();
713 for connection_id in pool.user_connection_ids(user_id) {
714 self.peer.send(
715 connection_id,
716 proto::UpdateInviteInfo {
717 url: format!(
718 "{}{}",
719 self.app_state.config.invite_link_prefix, invite_code
720 ),
721 count: user.invite_count as u32,
722 },
723 )?;
724 }
725 }
726 }
727 Ok(())
728 }
729
730 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
731 ServerSnapshot {
732 connection_pool: ConnectionPoolGuard {
733 guard: self.connection_pool.lock(),
734 _not_send: PhantomData,
735 },
736 peer: &self.peer,
737 }
738 }
739}
740
741impl<'a> Deref for ConnectionPoolGuard<'a> {
742 type Target = ConnectionPool;
743
744 fn deref(&self) -> &Self::Target {
745 &*self.guard
746 }
747}
748
749impl<'a> DerefMut for ConnectionPoolGuard<'a> {
750 fn deref_mut(&mut self) -> &mut Self::Target {
751 &mut *self.guard
752 }
753}
754
755impl<'a> Drop for ConnectionPoolGuard<'a> {
756 fn drop(&mut self) {
757 #[cfg(test)]
758 self.check_invariants();
759 }
760}
761
762fn broadcast<F>(
763 sender_id: Option<ConnectionId>,
764 receiver_ids: impl IntoIterator<Item = ConnectionId>,
765 mut f: F,
766) where
767 F: FnMut(ConnectionId) -> anyhow::Result<()>,
768{
769 for receiver_id in receiver_ids {
770 if Some(receiver_id) != sender_id {
771 if let Err(error) = f(receiver_id) {
772 tracing::error!("failed to send to {:?} {}", receiver_id, error);
773 }
774 }
775 }
776}
777
778lazy_static! {
779 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
780}
781
782pub struct ProtocolVersion(u32);
783
784impl Header for ProtocolVersion {
785 fn name() -> &'static HeaderName {
786 &ZED_PROTOCOL_VERSION
787 }
788
789 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
790 where
791 Self: Sized,
792 I: Iterator<Item = &'i axum::http::HeaderValue>,
793 {
794 let version = values
795 .next()
796 .ok_or_else(axum::headers::Error::invalid)?
797 .to_str()
798 .map_err(|_| axum::headers::Error::invalid())?
799 .parse()
800 .map_err(|_| axum::headers::Error::invalid())?;
801 Ok(Self(version))
802 }
803
804 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
805 values.extend([self.0.to_string().parse().unwrap()]);
806 }
807}
808
809pub fn routes(server: Arc<Server>) -> Router<Body> {
810 Router::new()
811 .route("/rpc", get(handle_websocket_request))
812 .layer(
813 ServiceBuilder::new()
814 .layer(Extension(server.app_state.clone()))
815 .layer(middleware::from_fn(auth::validate_header)),
816 )
817 .route("/metrics", get(handle_metrics))
818 .layer(Extension(server))
819}
820
821pub async fn handle_websocket_request(
822 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
823 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
824 Extension(server): Extension<Arc<Server>>,
825 Extension(user): Extension<User>,
826 ws: WebSocketUpgrade,
827) -> axum::response::Response {
828 if protocol_version != rpc::PROTOCOL_VERSION {
829 return (
830 StatusCode::UPGRADE_REQUIRED,
831 "client must be upgraded".to_string(),
832 )
833 .into_response();
834 }
835 let socket_address = socket_address.to_string();
836 ws.on_upgrade(move |socket| {
837 use util::ResultExt;
838 let socket = socket
839 .map_ok(to_tungstenite_message)
840 .err_into()
841 .with(|message| async move { Ok(to_axum_message(message)) });
842 let connection = Connection::new(Box::pin(socket));
843 async move {
844 server
845 .handle_connection(connection, socket_address, user, None, Executor::Production)
846 .await
847 .log_err();
848 }
849 })
850}
851
852pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
853 let connections = server
854 .connection_pool
855 .lock()
856 .connections()
857 .filter(|connection| !connection.admin)
858 .count();
859
860 METRIC_CONNECTIONS.set(connections as _);
861
862 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
863 METRIC_SHARED_PROJECTS.set(shared_projects as _);
864
865 let encoder = prometheus::TextEncoder::new();
866 let metric_families = prometheus::gather();
867 let encoded_metrics = encoder
868 .encode_to_string(&metric_families)
869 .map_err(|err| anyhow!("{}", err))?;
870 Ok(encoded_metrics)
871}
872
873#[instrument(err, skip(executor))]
874async fn connection_lost(
875 session: Session,
876 mut teardown: watch::Receiver<()>,
877 executor: Executor,
878) -> Result<()> {
879 session.peer.disconnect(session.connection_id);
880 session
881 .connection_pool()
882 .await
883 .remove_connection(session.connection_id)?;
884
885 session
886 .db()
887 .await
888 .connection_lost(session.connection_id)
889 .await
890 .trace_err();
891
892 futures::select_biased! {
893 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
894 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
895 leave_room_for_session(&session).await.trace_err();
896 leave_channel_buffers_for_session(&session)
897 .await
898 .trace_err();
899
900 if !session
901 .connection_pool()
902 .await
903 .is_user_online(session.user_id)
904 {
905 let db = session.db().await;
906 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
907 room_updated(&room, &session.peer);
908 }
909 }
910
911 update_user_contacts(session.user_id, &session).await?;
912 }
913 _ = teardown.changed().fuse() => {}
914 }
915
916 Ok(())
917}
918
919async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
920 response.send(proto::Ack {})?;
921 Ok(())
922}
923
924async fn create_room(
925 _request: proto::CreateRoom,
926 response: Response<proto::CreateRoom>,
927 session: Session,
928) -> Result<()> {
929 let live_kit_room = nanoid::nanoid!(30);
930
931 let live_kit_connection_info = {
932 let live_kit_room = live_kit_room.clone();
933 let live_kit = session.live_kit_client.as_ref();
934
935 util::async_iife!({
936 let live_kit = live_kit?;
937
938 live_kit
939 .create_room(live_kit_room.clone())
940 .await
941 .trace_err()?;
942
943 let token = live_kit
944 .room_token(&live_kit_room, &session.user_id.to_string())
945 .trace_err()?;
946
947 Some(proto::LiveKitConnectionInfo {
948 server_url: live_kit.url().into(),
949 token,
950 })
951 })
952 }
953 .await;
954
955 let room = session
956 .db()
957 .await
958 .create_room(session.user_id, session.connection_id, &live_kit_room)
959 .await?;
960
961 response.send(proto::CreateRoomResponse {
962 room: Some(room.clone()),
963 live_kit_connection_info,
964 })?;
965
966 update_user_contacts(session.user_id, &session).await?;
967 Ok(())
968}
969
970async fn join_room(
971 request: proto::JoinRoom,
972 response: Response<proto::JoinRoom>,
973 session: Session,
974) -> Result<()> {
975 let room_id = RoomId::from_proto(request.id);
976 let joined_room = {
977 let room = session
978 .db()
979 .await
980 .join_room(room_id, session.user_id, session.connection_id)
981 .await?;
982 room_updated(&room.room, &session.peer);
983 room.into_inner()
984 };
985
986 if let Some(channel_id) = joined_room.channel_id {
987 channel_updated(
988 channel_id,
989 &joined_room.room,
990 &joined_room.channel_members,
991 &session.peer,
992 &*session.connection_pool().await,
993 )
994 }
995
996 for connection_id in session
997 .connection_pool()
998 .await
999 .user_connection_ids(session.user_id)
1000 {
1001 session
1002 .peer
1003 .send(
1004 connection_id,
1005 proto::CallCanceled {
1006 room_id: room_id.to_proto(),
1007 },
1008 )
1009 .trace_err();
1010 }
1011
1012 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1013 if let Some(token) = live_kit
1014 .room_token(
1015 &joined_room.room.live_kit_room,
1016 &session.user_id.to_string(),
1017 )
1018 .trace_err()
1019 {
1020 Some(proto::LiveKitConnectionInfo {
1021 server_url: live_kit.url().into(),
1022 token,
1023 })
1024 } else {
1025 None
1026 }
1027 } else {
1028 None
1029 };
1030
1031 response.send(proto::JoinRoomResponse {
1032 room: Some(joined_room.room),
1033 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1034 live_kit_connection_info,
1035 })?;
1036
1037 update_user_contacts(session.user_id, &session).await?;
1038 Ok(())
1039}
1040
1041async fn rejoin_room(
1042 request: proto::RejoinRoom,
1043 response: Response<proto::RejoinRoom>,
1044 session: Session,
1045) -> Result<()> {
1046 let room;
1047 let channel_id;
1048 let channel_members;
1049 {
1050 let mut rejoined_room = session
1051 .db()
1052 .await
1053 .rejoin_room(request, session.user_id, session.connection_id)
1054 .await?;
1055
1056 response.send(proto::RejoinRoomResponse {
1057 room: Some(rejoined_room.room.clone()),
1058 reshared_projects: rejoined_room
1059 .reshared_projects
1060 .iter()
1061 .map(|project| proto::ResharedProject {
1062 id: project.id.to_proto(),
1063 collaborators: project
1064 .collaborators
1065 .iter()
1066 .map(|collaborator| collaborator.to_proto())
1067 .collect(),
1068 })
1069 .collect(),
1070 rejoined_projects: rejoined_room
1071 .rejoined_projects
1072 .iter()
1073 .map(|rejoined_project| proto::RejoinedProject {
1074 id: rejoined_project.id.to_proto(),
1075 worktrees: rejoined_project
1076 .worktrees
1077 .iter()
1078 .map(|worktree| proto::WorktreeMetadata {
1079 id: worktree.id,
1080 root_name: worktree.root_name.clone(),
1081 visible: worktree.visible,
1082 abs_path: worktree.abs_path.clone(),
1083 })
1084 .collect(),
1085 collaborators: rejoined_project
1086 .collaborators
1087 .iter()
1088 .map(|collaborator| collaborator.to_proto())
1089 .collect(),
1090 language_servers: rejoined_project.language_servers.clone(),
1091 })
1092 .collect(),
1093 })?;
1094 room_updated(&rejoined_room.room, &session.peer);
1095
1096 for project in &rejoined_room.reshared_projects {
1097 for collaborator in &project.collaborators {
1098 session
1099 .peer
1100 .send(
1101 collaborator.connection_id,
1102 proto::UpdateProjectCollaborator {
1103 project_id: project.id.to_proto(),
1104 old_peer_id: Some(project.old_connection_id.into()),
1105 new_peer_id: Some(session.connection_id.into()),
1106 },
1107 )
1108 .trace_err();
1109 }
1110
1111 broadcast(
1112 Some(session.connection_id),
1113 project
1114 .collaborators
1115 .iter()
1116 .map(|collaborator| collaborator.connection_id),
1117 |connection_id| {
1118 session.peer.forward_send(
1119 session.connection_id,
1120 connection_id,
1121 proto::UpdateProject {
1122 project_id: project.id.to_proto(),
1123 worktrees: project.worktrees.clone(),
1124 },
1125 )
1126 },
1127 );
1128 }
1129
1130 for project in &rejoined_room.rejoined_projects {
1131 for collaborator in &project.collaborators {
1132 session
1133 .peer
1134 .send(
1135 collaborator.connection_id,
1136 proto::UpdateProjectCollaborator {
1137 project_id: project.id.to_proto(),
1138 old_peer_id: Some(project.old_connection_id.into()),
1139 new_peer_id: Some(session.connection_id.into()),
1140 },
1141 )
1142 .trace_err();
1143 }
1144 }
1145
1146 for project in &mut rejoined_room.rejoined_projects {
1147 for worktree in mem::take(&mut project.worktrees) {
1148 #[cfg(any(test, feature = "test-support"))]
1149 const MAX_CHUNK_SIZE: usize = 2;
1150 #[cfg(not(any(test, feature = "test-support")))]
1151 const MAX_CHUNK_SIZE: usize = 256;
1152
1153 // Stream this worktree's entries.
1154 let message = proto::UpdateWorktree {
1155 project_id: project.id.to_proto(),
1156 worktree_id: worktree.id,
1157 abs_path: worktree.abs_path.clone(),
1158 root_name: worktree.root_name,
1159 updated_entries: worktree.updated_entries,
1160 removed_entries: worktree.removed_entries,
1161 scan_id: worktree.scan_id,
1162 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1163 updated_repositories: worktree.updated_repositories,
1164 removed_repositories: worktree.removed_repositories,
1165 };
1166 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1167 session.peer.send(session.connection_id, update.clone())?;
1168 }
1169
1170 // Stream this worktree's diagnostics.
1171 for summary in worktree.diagnostic_summaries {
1172 session.peer.send(
1173 session.connection_id,
1174 proto::UpdateDiagnosticSummary {
1175 project_id: project.id.to_proto(),
1176 worktree_id: worktree.id,
1177 summary: Some(summary),
1178 },
1179 )?;
1180 }
1181
1182 for settings_file in worktree.settings_files {
1183 session.peer.send(
1184 session.connection_id,
1185 proto::UpdateWorktreeSettings {
1186 project_id: project.id.to_proto(),
1187 worktree_id: worktree.id,
1188 path: settings_file.path,
1189 content: Some(settings_file.content),
1190 },
1191 )?;
1192 }
1193 }
1194
1195 for language_server in &project.language_servers {
1196 session.peer.send(
1197 session.connection_id,
1198 proto::UpdateLanguageServer {
1199 project_id: project.id.to_proto(),
1200 language_server_id: language_server.id,
1201 variant: Some(
1202 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1203 proto::LspDiskBasedDiagnosticsUpdated {},
1204 ),
1205 ),
1206 },
1207 )?;
1208 }
1209 }
1210
1211 let rejoined_room = rejoined_room.into_inner();
1212
1213 room = rejoined_room.room;
1214 channel_id = rejoined_room.channel_id;
1215 channel_members = rejoined_room.channel_members;
1216 }
1217
1218 if let Some(channel_id) = channel_id {
1219 channel_updated(
1220 channel_id,
1221 &room,
1222 &channel_members,
1223 &session.peer,
1224 &*session.connection_pool().await,
1225 );
1226 }
1227
1228 update_user_contacts(session.user_id, &session).await?;
1229 Ok(())
1230}
1231
1232async fn leave_room(
1233 _: proto::LeaveRoom,
1234 response: Response<proto::LeaveRoom>,
1235 session: Session,
1236) -> Result<()> {
1237 leave_room_for_session(&session).await?;
1238 response.send(proto::Ack {})?;
1239 Ok(())
1240}
1241
1242async fn call(
1243 request: proto::Call,
1244 response: Response<proto::Call>,
1245 session: Session,
1246) -> Result<()> {
1247 let room_id = RoomId::from_proto(request.room_id);
1248 let calling_user_id = session.user_id;
1249 let calling_connection_id = session.connection_id;
1250 let called_user_id = UserId::from_proto(request.called_user_id);
1251 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1252 if !session
1253 .db()
1254 .await
1255 .has_contact(calling_user_id, called_user_id)
1256 .await?
1257 {
1258 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1259 }
1260
1261 let incoming_call = {
1262 let (room, incoming_call) = &mut *session
1263 .db()
1264 .await
1265 .call(
1266 room_id,
1267 calling_user_id,
1268 calling_connection_id,
1269 called_user_id,
1270 initial_project_id,
1271 )
1272 .await?;
1273 room_updated(&room, &session.peer);
1274 mem::take(incoming_call)
1275 };
1276 update_user_contacts(called_user_id, &session).await?;
1277
1278 let mut calls = session
1279 .connection_pool()
1280 .await
1281 .user_connection_ids(called_user_id)
1282 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1283 .collect::<FuturesUnordered<_>>();
1284
1285 while let Some(call_response) = calls.next().await {
1286 match call_response.as_ref() {
1287 Ok(_) => {
1288 response.send(proto::Ack {})?;
1289 return Ok(());
1290 }
1291 Err(_) => {
1292 call_response.trace_err();
1293 }
1294 }
1295 }
1296
1297 {
1298 let room = session
1299 .db()
1300 .await
1301 .call_failed(room_id, called_user_id)
1302 .await?;
1303 room_updated(&room, &session.peer);
1304 }
1305 update_user_contacts(called_user_id, &session).await?;
1306
1307 Err(anyhow!("failed to ring user"))?
1308}
1309
1310async fn cancel_call(
1311 request: proto::CancelCall,
1312 response: Response<proto::CancelCall>,
1313 session: Session,
1314) -> Result<()> {
1315 let called_user_id = UserId::from_proto(request.called_user_id);
1316 let room_id = RoomId::from_proto(request.room_id);
1317 {
1318 let room = session
1319 .db()
1320 .await
1321 .cancel_call(room_id, session.connection_id, called_user_id)
1322 .await?;
1323 room_updated(&room, &session.peer);
1324 }
1325
1326 for connection_id in session
1327 .connection_pool()
1328 .await
1329 .user_connection_ids(called_user_id)
1330 {
1331 session
1332 .peer
1333 .send(
1334 connection_id,
1335 proto::CallCanceled {
1336 room_id: room_id.to_proto(),
1337 },
1338 )
1339 .trace_err();
1340 }
1341 response.send(proto::Ack {})?;
1342
1343 update_user_contacts(called_user_id, &session).await?;
1344 Ok(())
1345}
1346
1347async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1348 let room_id = RoomId::from_proto(message.room_id);
1349 {
1350 let room = session
1351 .db()
1352 .await
1353 .decline_call(Some(room_id), session.user_id)
1354 .await?
1355 .ok_or_else(|| anyhow!("failed to decline call"))?;
1356 room_updated(&room, &session.peer);
1357 }
1358
1359 for connection_id in session
1360 .connection_pool()
1361 .await
1362 .user_connection_ids(session.user_id)
1363 {
1364 session
1365 .peer
1366 .send(
1367 connection_id,
1368 proto::CallCanceled {
1369 room_id: room_id.to_proto(),
1370 },
1371 )
1372 .trace_err();
1373 }
1374 update_user_contacts(session.user_id, &session).await?;
1375 Ok(())
1376}
1377
1378async fn update_participant_location(
1379 request: proto::UpdateParticipantLocation,
1380 response: Response<proto::UpdateParticipantLocation>,
1381 session: Session,
1382) -> Result<()> {
1383 let room_id = RoomId::from_proto(request.room_id);
1384 let location = request
1385 .location
1386 .ok_or_else(|| anyhow!("invalid location"))?;
1387
1388 let db = session.db().await;
1389 let room = db
1390 .update_room_participant_location(room_id, session.connection_id, location)
1391 .await?;
1392
1393 room_updated(&room, &session.peer);
1394 response.send(proto::Ack {})?;
1395 Ok(())
1396}
1397
1398async fn share_project(
1399 request: proto::ShareProject,
1400 response: Response<proto::ShareProject>,
1401 session: Session,
1402) -> Result<()> {
1403 let (project_id, room) = &*session
1404 .db()
1405 .await
1406 .share_project(
1407 RoomId::from_proto(request.room_id),
1408 session.connection_id,
1409 &request.worktrees,
1410 )
1411 .await?;
1412 response.send(proto::ShareProjectResponse {
1413 project_id: project_id.to_proto(),
1414 })?;
1415 room_updated(&room, &session.peer);
1416
1417 Ok(())
1418}
1419
1420async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1421 let project_id = ProjectId::from_proto(message.project_id);
1422
1423 let (room, guest_connection_ids) = &*session
1424 .db()
1425 .await
1426 .unshare_project(project_id, session.connection_id)
1427 .await?;
1428
1429 broadcast(
1430 Some(session.connection_id),
1431 guest_connection_ids.iter().copied(),
1432 |conn_id| session.peer.send(conn_id, message.clone()),
1433 );
1434 room_updated(&room, &session.peer);
1435
1436 Ok(())
1437}
1438
1439async fn join_project(
1440 request: proto::JoinProject,
1441 response: Response<proto::JoinProject>,
1442 session: Session,
1443) -> Result<()> {
1444 let project_id = ProjectId::from_proto(request.project_id);
1445 let guest_user_id = session.user_id;
1446
1447 tracing::info!(%project_id, "join project");
1448
1449 let (project, replica_id) = &mut *session
1450 .db()
1451 .await
1452 .join_project(project_id, session.connection_id)
1453 .await?;
1454
1455 let collaborators = project
1456 .collaborators
1457 .iter()
1458 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1459 .map(|collaborator| collaborator.to_proto())
1460 .collect::<Vec<_>>();
1461
1462 let worktrees = project
1463 .worktrees
1464 .iter()
1465 .map(|(id, worktree)| proto::WorktreeMetadata {
1466 id: *id,
1467 root_name: worktree.root_name.clone(),
1468 visible: worktree.visible,
1469 abs_path: worktree.abs_path.clone(),
1470 })
1471 .collect::<Vec<_>>();
1472
1473 for collaborator in &collaborators {
1474 session
1475 .peer
1476 .send(
1477 collaborator.peer_id.unwrap().into(),
1478 proto::AddProjectCollaborator {
1479 project_id: project_id.to_proto(),
1480 collaborator: Some(proto::Collaborator {
1481 peer_id: Some(session.connection_id.into()),
1482 replica_id: replica_id.0 as u32,
1483 user_id: guest_user_id.to_proto(),
1484 }),
1485 },
1486 )
1487 .trace_err();
1488 }
1489
1490 // First, we send the metadata associated with each worktree.
1491 response.send(proto::JoinProjectResponse {
1492 worktrees: worktrees.clone(),
1493 replica_id: replica_id.0 as u32,
1494 collaborators: collaborators.clone(),
1495 language_servers: project.language_servers.clone(),
1496 })?;
1497
1498 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1499 #[cfg(any(test, feature = "test-support"))]
1500 const MAX_CHUNK_SIZE: usize = 2;
1501 #[cfg(not(any(test, feature = "test-support")))]
1502 const MAX_CHUNK_SIZE: usize = 256;
1503
1504 // Stream this worktree's entries.
1505 let message = proto::UpdateWorktree {
1506 project_id: project_id.to_proto(),
1507 worktree_id,
1508 abs_path: worktree.abs_path.clone(),
1509 root_name: worktree.root_name,
1510 updated_entries: worktree.entries,
1511 removed_entries: Default::default(),
1512 scan_id: worktree.scan_id,
1513 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1514 updated_repositories: worktree.repository_entries.into_values().collect(),
1515 removed_repositories: Default::default(),
1516 };
1517 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1518 session.peer.send(session.connection_id, update.clone())?;
1519 }
1520
1521 // Stream this worktree's diagnostics.
1522 for summary in worktree.diagnostic_summaries {
1523 session.peer.send(
1524 session.connection_id,
1525 proto::UpdateDiagnosticSummary {
1526 project_id: project_id.to_proto(),
1527 worktree_id: worktree.id,
1528 summary: Some(summary),
1529 },
1530 )?;
1531 }
1532
1533 for settings_file in worktree.settings_files {
1534 session.peer.send(
1535 session.connection_id,
1536 proto::UpdateWorktreeSettings {
1537 project_id: project_id.to_proto(),
1538 worktree_id: worktree.id,
1539 path: settings_file.path,
1540 content: Some(settings_file.content),
1541 },
1542 )?;
1543 }
1544 }
1545
1546 for language_server in &project.language_servers {
1547 session.peer.send(
1548 session.connection_id,
1549 proto::UpdateLanguageServer {
1550 project_id: project_id.to_proto(),
1551 language_server_id: language_server.id,
1552 variant: Some(
1553 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1554 proto::LspDiskBasedDiagnosticsUpdated {},
1555 ),
1556 ),
1557 },
1558 )?;
1559 }
1560
1561 Ok(())
1562}
1563
1564async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1565 let sender_id = session.connection_id;
1566 let project_id = ProjectId::from_proto(request.project_id);
1567
1568 let (room, project) = &*session
1569 .db()
1570 .await
1571 .leave_project(project_id, sender_id)
1572 .await?;
1573 tracing::info!(
1574 %project_id,
1575 host_user_id = %project.host_user_id,
1576 host_connection_id = %project.host_connection_id,
1577 "leave project"
1578 );
1579
1580 project_left(&project, &session);
1581 room_updated(&room, &session.peer);
1582
1583 Ok(())
1584}
1585
1586async fn update_project(
1587 request: proto::UpdateProject,
1588 response: Response<proto::UpdateProject>,
1589 session: Session,
1590) -> Result<()> {
1591 let project_id = ProjectId::from_proto(request.project_id);
1592 let (room, guest_connection_ids) = &*session
1593 .db()
1594 .await
1595 .update_project(project_id, session.connection_id, &request.worktrees)
1596 .await?;
1597 broadcast(
1598 Some(session.connection_id),
1599 guest_connection_ids.iter().copied(),
1600 |connection_id| {
1601 session
1602 .peer
1603 .forward_send(session.connection_id, connection_id, request.clone())
1604 },
1605 );
1606 room_updated(&room, &session.peer);
1607 response.send(proto::Ack {})?;
1608
1609 Ok(())
1610}
1611
1612async fn update_worktree(
1613 request: proto::UpdateWorktree,
1614 response: Response<proto::UpdateWorktree>,
1615 session: Session,
1616) -> Result<()> {
1617 let guest_connection_ids = session
1618 .db()
1619 .await
1620 .update_worktree(&request, session.connection_id)
1621 .await?;
1622
1623 broadcast(
1624 Some(session.connection_id),
1625 guest_connection_ids.iter().copied(),
1626 |connection_id| {
1627 session
1628 .peer
1629 .forward_send(session.connection_id, connection_id, request.clone())
1630 },
1631 );
1632 response.send(proto::Ack {})?;
1633 Ok(())
1634}
1635
1636async fn update_diagnostic_summary(
1637 message: proto::UpdateDiagnosticSummary,
1638 session: Session,
1639) -> Result<()> {
1640 let guest_connection_ids = session
1641 .db()
1642 .await
1643 .update_diagnostic_summary(&message, session.connection_id)
1644 .await?;
1645
1646 broadcast(
1647 Some(session.connection_id),
1648 guest_connection_ids.iter().copied(),
1649 |connection_id| {
1650 session
1651 .peer
1652 .forward_send(session.connection_id, connection_id, message.clone())
1653 },
1654 );
1655
1656 Ok(())
1657}
1658
1659async fn update_worktree_settings(
1660 message: proto::UpdateWorktreeSettings,
1661 session: Session,
1662) -> Result<()> {
1663 let guest_connection_ids = session
1664 .db()
1665 .await
1666 .update_worktree_settings(&message, session.connection_id)
1667 .await?;
1668
1669 broadcast(
1670 Some(session.connection_id),
1671 guest_connection_ids.iter().copied(),
1672 |connection_id| {
1673 session
1674 .peer
1675 .forward_send(session.connection_id, connection_id, message.clone())
1676 },
1677 );
1678
1679 Ok(())
1680}
1681
1682async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1683 broadcast_project_message(request.project_id, request, session).await
1684}
1685
1686async fn start_language_server(
1687 request: proto::StartLanguageServer,
1688 session: Session,
1689) -> Result<()> {
1690 let guest_connection_ids = session
1691 .db()
1692 .await
1693 .start_language_server(&request, session.connection_id)
1694 .await?;
1695
1696 broadcast(
1697 Some(session.connection_id),
1698 guest_connection_ids.iter().copied(),
1699 |connection_id| {
1700 session
1701 .peer
1702 .forward_send(session.connection_id, connection_id, request.clone())
1703 },
1704 );
1705 Ok(())
1706}
1707
1708async fn update_language_server(
1709 request: proto::UpdateLanguageServer,
1710 session: Session,
1711) -> Result<()> {
1712 session.executor.record_backtrace();
1713 let project_id = ProjectId::from_proto(request.project_id);
1714 let project_connection_ids = session
1715 .db()
1716 .await
1717 .project_connection_ids(project_id, session.connection_id)
1718 .await?;
1719 broadcast(
1720 Some(session.connection_id),
1721 project_connection_ids.iter().copied(),
1722 |connection_id| {
1723 session
1724 .peer
1725 .forward_send(session.connection_id, connection_id, request.clone())
1726 },
1727 );
1728 Ok(())
1729}
1730
1731async fn forward_project_request<T>(
1732 request: T,
1733 response: Response<T>,
1734 session: Session,
1735) -> Result<()>
1736where
1737 T: EntityMessage + RequestMessage,
1738{
1739 session.executor.record_backtrace();
1740 let project_id = ProjectId::from_proto(request.remote_entity_id());
1741 let host_connection_id = {
1742 let collaborators = session
1743 .db()
1744 .await
1745 .project_collaborators(project_id, session.connection_id)
1746 .await?;
1747 collaborators
1748 .iter()
1749 .find(|collaborator| collaborator.is_host)
1750 .ok_or_else(|| anyhow!("host not found"))?
1751 .connection_id
1752 };
1753
1754 let payload = session
1755 .peer
1756 .forward_request(session.connection_id, host_connection_id, request)
1757 .await?;
1758
1759 response.send(payload)?;
1760 Ok(())
1761}
1762
1763async fn create_buffer_for_peer(
1764 request: proto::CreateBufferForPeer,
1765 session: Session,
1766) -> Result<()> {
1767 session.executor.record_backtrace();
1768 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1769 session
1770 .peer
1771 .forward_send(session.connection_id, peer_id.into(), request)?;
1772 Ok(())
1773}
1774
1775async fn update_buffer(
1776 request: proto::UpdateBuffer,
1777 response: Response<proto::UpdateBuffer>,
1778 session: Session,
1779) -> Result<()> {
1780 session.executor.record_backtrace();
1781 let project_id = ProjectId::from_proto(request.project_id);
1782 let mut guest_connection_ids;
1783 let mut host_connection_id = None;
1784 {
1785 let collaborators = session
1786 .db()
1787 .await
1788 .project_collaborators(project_id, session.connection_id)
1789 .await?;
1790 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1791 for collaborator in collaborators.iter() {
1792 if collaborator.is_host {
1793 host_connection_id = Some(collaborator.connection_id);
1794 } else {
1795 guest_connection_ids.push(collaborator.connection_id);
1796 }
1797 }
1798 }
1799 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1800
1801 session.executor.record_backtrace();
1802 broadcast(
1803 Some(session.connection_id),
1804 guest_connection_ids,
1805 |connection_id| {
1806 session
1807 .peer
1808 .forward_send(session.connection_id, connection_id, request.clone())
1809 },
1810 );
1811 if host_connection_id != session.connection_id {
1812 session
1813 .peer
1814 .forward_request(session.connection_id, host_connection_id, request.clone())
1815 .await?;
1816 }
1817
1818 response.send(proto::Ack {})?;
1819 Ok(())
1820}
1821
1822async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1823 let project_id = ProjectId::from_proto(request.project_id);
1824 let project_connection_ids = session
1825 .db()
1826 .await
1827 .project_connection_ids(project_id, session.connection_id)
1828 .await?;
1829
1830 broadcast(
1831 Some(session.connection_id),
1832 project_connection_ids.iter().copied(),
1833 |connection_id| {
1834 session
1835 .peer
1836 .forward_send(session.connection_id, connection_id, request.clone())
1837 },
1838 );
1839 Ok(())
1840}
1841
1842async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1843 let project_id = ProjectId::from_proto(request.project_id);
1844 let project_connection_ids = session
1845 .db()
1846 .await
1847 .project_connection_ids(project_id, session.connection_id)
1848 .await?;
1849 broadcast(
1850 Some(session.connection_id),
1851 project_connection_ids.iter().copied(),
1852 |connection_id| {
1853 session
1854 .peer
1855 .forward_send(session.connection_id, connection_id, request.clone())
1856 },
1857 );
1858 Ok(())
1859}
1860
1861async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1862 broadcast_project_message(request.project_id, request, session).await
1863}
1864
1865async fn broadcast_project_message<T: EnvelopedMessage>(
1866 project_id: u64,
1867 request: T,
1868 session: Session,
1869) -> Result<()> {
1870 let project_id = ProjectId::from_proto(project_id);
1871 let project_connection_ids = session
1872 .db()
1873 .await
1874 .project_connection_ids(project_id, session.connection_id)
1875 .await?;
1876 broadcast(
1877 Some(session.connection_id),
1878 project_connection_ids.iter().copied(),
1879 |connection_id| {
1880 session
1881 .peer
1882 .forward_send(session.connection_id, connection_id, request.clone())
1883 },
1884 );
1885 Ok(())
1886}
1887
1888async fn follow(
1889 request: proto::Follow,
1890 response: Response<proto::Follow>,
1891 session: Session,
1892) -> Result<()> {
1893 let room_id = RoomId::from_proto(request.room_id);
1894 let project_id = request.project_id.map(ProjectId::from_proto);
1895 let leader_id = request
1896 .leader_id
1897 .ok_or_else(|| anyhow!("invalid leader id"))?
1898 .into();
1899 let follower_id = session.connection_id;
1900
1901 session
1902 .db()
1903 .await
1904 .check_room_participants(room_id, leader_id, session.connection_id)
1905 .await?;
1906
1907 let mut response_payload = session
1908 .peer
1909 .forward_request(session.connection_id, leader_id, request)
1910 .await?;
1911 response_payload
1912 .views
1913 .retain(|view| view.leader_id != Some(follower_id.into()));
1914 response.send(response_payload)?;
1915
1916 if let Some(project_id) = project_id {
1917 let room = session
1918 .db()
1919 .await
1920 .follow(room_id, project_id, leader_id, follower_id)
1921 .await?;
1922 room_updated(&room, &session.peer);
1923 }
1924
1925 Ok(())
1926}
1927
1928async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1929 let room_id = RoomId::from_proto(request.room_id);
1930 let project_id = request.project_id.map(ProjectId::from_proto);
1931 let leader_id = request
1932 .leader_id
1933 .ok_or_else(|| anyhow!("invalid leader id"))?
1934 .into();
1935 let follower_id = session.connection_id;
1936
1937 session
1938 .db()
1939 .await
1940 .check_room_participants(room_id, leader_id, session.connection_id)
1941 .await?;
1942
1943 session
1944 .peer
1945 .forward_send(session.connection_id, leader_id, request)?;
1946
1947 if let Some(project_id) = project_id {
1948 let room = session
1949 .db()
1950 .await
1951 .unfollow(room_id, project_id, leader_id, follower_id)
1952 .await?;
1953 room_updated(&room, &session.peer);
1954 }
1955
1956 Ok(())
1957}
1958
1959async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1960 let room_id = RoomId::from_proto(request.room_id);
1961 let database = session.db.lock().await;
1962
1963 let connection_ids = if let Some(project_id) = request.project_id {
1964 let project_id = ProjectId::from_proto(project_id);
1965 database
1966 .project_connection_ids(project_id, session.connection_id)
1967 .await?
1968 } else {
1969 database
1970 .room_connection_ids(room_id, session.connection_id)
1971 .await?
1972 };
1973
1974 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1975 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1976 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1977 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1978 });
1979 for follower_peer_id in request.follower_ids.iter().copied() {
1980 let follower_connection_id = follower_peer_id.into();
1981 if Some(follower_peer_id) != leader_id && connection_ids.contains(&follower_connection_id) {
1982 session.peer.forward_send(
1983 session.connection_id,
1984 follower_connection_id,
1985 request.clone(),
1986 )?;
1987 }
1988 }
1989 Ok(())
1990}
1991
1992async fn get_users(
1993 request: proto::GetUsers,
1994 response: Response<proto::GetUsers>,
1995 session: Session,
1996) -> Result<()> {
1997 let user_ids = request
1998 .user_ids
1999 .into_iter()
2000 .map(UserId::from_proto)
2001 .collect();
2002 let users = session
2003 .db()
2004 .await
2005 .get_users_by_ids(user_ids)
2006 .await?
2007 .into_iter()
2008 .map(|user| proto::User {
2009 id: user.id.to_proto(),
2010 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2011 github_login: user.github_login,
2012 })
2013 .collect();
2014 response.send(proto::UsersResponse { users })?;
2015 Ok(())
2016}
2017
2018async fn fuzzy_search_users(
2019 request: proto::FuzzySearchUsers,
2020 response: Response<proto::FuzzySearchUsers>,
2021 session: Session,
2022) -> Result<()> {
2023 let query = request.query;
2024 let users = match query.len() {
2025 0 => vec![],
2026 1 | 2 => session
2027 .db()
2028 .await
2029 .get_user_by_github_login(&query)
2030 .await?
2031 .into_iter()
2032 .collect(),
2033 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2034 };
2035 let users = users
2036 .into_iter()
2037 .filter(|user| user.id != session.user_id)
2038 .map(|user| proto::User {
2039 id: user.id.to_proto(),
2040 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2041 github_login: user.github_login,
2042 })
2043 .collect();
2044 response.send(proto::UsersResponse { users })?;
2045 Ok(())
2046}
2047
2048async fn request_contact(
2049 request: proto::RequestContact,
2050 response: Response<proto::RequestContact>,
2051 session: Session,
2052) -> Result<()> {
2053 let requester_id = session.user_id;
2054 let responder_id = UserId::from_proto(request.responder_id);
2055 if requester_id == responder_id {
2056 return Err(anyhow!("cannot add yourself as a contact"))?;
2057 }
2058
2059 session
2060 .db()
2061 .await
2062 .send_contact_request(requester_id, responder_id)
2063 .await?;
2064
2065 // Update outgoing contact requests of requester
2066 let mut update = proto::UpdateContacts::default();
2067 update.outgoing_requests.push(responder_id.to_proto());
2068 for connection_id in session
2069 .connection_pool()
2070 .await
2071 .user_connection_ids(requester_id)
2072 {
2073 session.peer.send(connection_id, update.clone())?;
2074 }
2075
2076 // Update incoming contact requests of responder
2077 let mut update = proto::UpdateContacts::default();
2078 update
2079 .incoming_requests
2080 .push(proto::IncomingContactRequest {
2081 requester_id: requester_id.to_proto(),
2082 should_notify: true,
2083 });
2084 for connection_id in session
2085 .connection_pool()
2086 .await
2087 .user_connection_ids(responder_id)
2088 {
2089 session.peer.send(connection_id, update.clone())?;
2090 }
2091
2092 response.send(proto::Ack {})?;
2093 Ok(())
2094}
2095
2096async fn respond_to_contact_request(
2097 request: proto::RespondToContactRequest,
2098 response: Response<proto::RespondToContactRequest>,
2099 session: Session,
2100) -> Result<()> {
2101 let responder_id = session.user_id;
2102 let requester_id = UserId::from_proto(request.requester_id);
2103 let db = session.db().await;
2104 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2105 db.dismiss_contact_notification(responder_id, requester_id)
2106 .await?;
2107 } else {
2108 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2109
2110 db.respond_to_contact_request(responder_id, requester_id, accept)
2111 .await?;
2112 let requester_busy = db.is_user_busy(requester_id).await?;
2113 let responder_busy = db.is_user_busy(responder_id).await?;
2114
2115 let pool = session.connection_pool().await;
2116 // Update responder with new contact
2117 let mut update = proto::UpdateContacts::default();
2118 if accept {
2119 update
2120 .contacts
2121 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2122 }
2123 update
2124 .remove_incoming_requests
2125 .push(requester_id.to_proto());
2126 for connection_id in pool.user_connection_ids(responder_id) {
2127 session.peer.send(connection_id, update.clone())?;
2128 }
2129
2130 // Update requester with new contact
2131 let mut update = proto::UpdateContacts::default();
2132 if accept {
2133 update
2134 .contacts
2135 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2136 }
2137 update
2138 .remove_outgoing_requests
2139 .push(responder_id.to_proto());
2140 for connection_id in pool.user_connection_ids(requester_id) {
2141 session.peer.send(connection_id, update.clone())?;
2142 }
2143 }
2144
2145 response.send(proto::Ack {})?;
2146 Ok(())
2147}
2148
2149async fn remove_contact(
2150 request: proto::RemoveContact,
2151 response: Response<proto::RemoveContact>,
2152 session: Session,
2153) -> Result<()> {
2154 let requester_id = session.user_id;
2155 let responder_id = UserId::from_proto(request.user_id);
2156 let db = session.db().await;
2157 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2158
2159 let pool = session.connection_pool().await;
2160 // Update outgoing contact requests of requester
2161 let mut update = proto::UpdateContacts::default();
2162 if contact_accepted {
2163 update.remove_contacts.push(responder_id.to_proto());
2164 } else {
2165 update
2166 .remove_outgoing_requests
2167 .push(responder_id.to_proto());
2168 }
2169 for connection_id in pool.user_connection_ids(requester_id) {
2170 session.peer.send(connection_id, update.clone())?;
2171 }
2172
2173 // Update incoming contact requests of responder
2174 let mut update = proto::UpdateContacts::default();
2175 if contact_accepted {
2176 update.remove_contacts.push(requester_id.to_proto());
2177 } else {
2178 update
2179 .remove_incoming_requests
2180 .push(requester_id.to_proto());
2181 }
2182 for connection_id in pool.user_connection_ids(responder_id) {
2183 session.peer.send(connection_id, update.clone())?;
2184 }
2185
2186 response.send(proto::Ack {})?;
2187 Ok(())
2188}
2189
2190async fn create_channel(
2191 request: proto::CreateChannel,
2192 response: Response<proto::CreateChannel>,
2193 session: Session,
2194) -> Result<()> {
2195 let db = session.db().await;
2196 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2197
2198 if let Some(live_kit) = session.live_kit_client.as_ref() {
2199 live_kit.create_room(live_kit_room.clone()).await?;
2200 }
2201
2202 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2203 let id = db
2204 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2205 .await?;
2206
2207 let channel = proto::Channel {
2208 id: id.to_proto(),
2209 name: request.name,
2210 };
2211
2212 response.send(proto::CreateChannelResponse {
2213 channel: Some(channel.clone()),
2214 parent_id: request.parent_id,
2215 })?;
2216
2217 let Some(parent_id) = parent_id else {
2218 return Ok(());
2219 };
2220
2221 let update = proto::UpdateChannels {
2222 channels: vec![channel],
2223 insert_edge: vec![ChannelEdge {
2224 parent_id: parent_id.to_proto(),
2225 channel_id: id.to_proto(),
2226 }],
2227 ..Default::default()
2228 };
2229
2230 let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2231
2232 let connection_pool = session.connection_pool().await;
2233 for user_id in user_ids_to_notify {
2234 for connection_id in connection_pool.user_connection_ids(user_id) {
2235 if user_id == session.user_id {
2236 continue;
2237 }
2238 session.peer.send(connection_id, update.clone())?;
2239 }
2240 }
2241
2242 Ok(())
2243}
2244
2245async fn delete_channel(
2246 request: proto::DeleteChannel,
2247 response: Response<proto::DeleteChannel>,
2248 session: Session,
2249) -> Result<()> {
2250 let db = session.db().await;
2251
2252 let channel_id = request.channel_id;
2253 let (removed_channels, member_ids) = db
2254 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2255 .await?;
2256 response.send(proto::Ack {})?;
2257
2258 // Notify members of removed channels
2259 let mut update = proto::UpdateChannels::default();
2260 update
2261 .delete_channels
2262 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2263
2264 let connection_pool = session.connection_pool().await;
2265 for member_id in member_ids {
2266 for connection_id in connection_pool.user_connection_ids(member_id) {
2267 session.peer.send(connection_id, update.clone())?;
2268 }
2269 }
2270
2271 Ok(())
2272}
2273
2274async fn invite_channel_member(
2275 request: proto::InviteChannelMember,
2276 response: Response<proto::InviteChannelMember>,
2277 session: Session,
2278) -> Result<()> {
2279 let db = session.db().await;
2280 let channel_id = ChannelId::from_proto(request.channel_id);
2281 let invitee_id = UserId::from_proto(request.user_id);
2282 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2283 .await?;
2284
2285 let (channel, _) = db
2286 .get_channel(channel_id, session.user_id)
2287 .await?
2288 .ok_or_else(|| anyhow!("channel not found"))?;
2289
2290 let mut update = proto::UpdateChannels::default();
2291 update.channel_invitations.push(proto::Channel {
2292 id: channel.id.to_proto(),
2293 name: channel.name,
2294 });
2295 for connection_id in session
2296 .connection_pool()
2297 .await
2298 .user_connection_ids(invitee_id)
2299 {
2300 session.peer.send(connection_id, update.clone())?;
2301 }
2302
2303 response.send(proto::Ack {})?;
2304 Ok(())
2305}
2306
2307async fn remove_channel_member(
2308 request: proto::RemoveChannelMember,
2309 response: Response<proto::RemoveChannelMember>,
2310 session: Session,
2311) -> Result<()> {
2312 let db = session.db().await;
2313 let channel_id = ChannelId::from_proto(request.channel_id);
2314 let member_id = UserId::from_proto(request.user_id);
2315
2316 db.remove_channel_member(channel_id, member_id, session.user_id)
2317 .await?;
2318
2319 let mut update = proto::UpdateChannels::default();
2320 update.delete_channels.push(channel_id.to_proto());
2321
2322 for connection_id in session
2323 .connection_pool()
2324 .await
2325 .user_connection_ids(member_id)
2326 {
2327 session.peer.send(connection_id, update.clone())?;
2328 }
2329
2330 response.send(proto::Ack {})?;
2331 Ok(())
2332}
2333
2334async fn set_channel_member_admin(
2335 request: proto::SetChannelMemberAdmin,
2336 response: Response<proto::SetChannelMemberAdmin>,
2337 session: Session,
2338) -> Result<()> {
2339 let db = session.db().await;
2340 let channel_id = ChannelId::from_proto(request.channel_id);
2341 let member_id = UserId::from_proto(request.user_id);
2342 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2343 .await?;
2344
2345 let (channel, has_accepted) = db
2346 .get_channel(channel_id, member_id)
2347 .await?
2348 .ok_or_else(|| anyhow!("channel not found"))?;
2349
2350 let mut update = proto::UpdateChannels::default();
2351 if has_accepted {
2352 update.channel_permissions.push(proto::ChannelPermission {
2353 channel_id: channel.id.to_proto(),
2354 is_admin: request.admin,
2355 });
2356 }
2357
2358 for connection_id in session
2359 .connection_pool()
2360 .await
2361 .user_connection_ids(member_id)
2362 {
2363 session.peer.send(connection_id, update.clone())?;
2364 }
2365
2366 response.send(proto::Ack {})?;
2367 Ok(())
2368}
2369
2370async fn rename_channel(
2371 request: proto::RenameChannel,
2372 response: Response<proto::RenameChannel>,
2373 session: Session,
2374) -> Result<()> {
2375 let db = session.db().await;
2376 let channel_id = ChannelId::from_proto(request.channel_id);
2377 let new_name = db
2378 .rename_channel(channel_id, session.user_id, &request.name)
2379 .await?;
2380
2381 let channel = proto::Channel {
2382 id: request.channel_id,
2383 name: new_name,
2384 };
2385 response.send(proto::RenameChannelResponse {
2386 channel: Some(channel.clone()),
2387 })?;
2388 let mut update = proto::UpdateChannels::default();
2389 update.channels.push(channel);
2390
2391 let member_ids = db.get_channel_members(channel_id).await?;
2392
2393 let connection_pool = session.connection_pool().await;
2394 for member_id in member_ids {
2395 for connection_id in connection_pool.user_connection_ids(member_id) {
2396 session.peer.send(connection_id, update.clone())?;
2397 }
2398 }
2399
2400 Ok(())
2401}
2402
2403async fn link_channel(
2404 request: proto::LinkChannel,
2405 response: Response<proto::LinkChannel>,
2406 session: Session,
2407) -> Result<()> {
2408 let db = session.db().await;
2409 let channel_id = ChannelId::from_proto(request.channel_id);
2410 let to = ChannelId::from_proto(request.to);
2411 let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2412
2413 let members = db.get_channel_members(to).await?;
2414 let connection_pool = session.connection_pool().await;
2415 let update = proto::UpdateChannels {
2416 channels: channels_to_send
2417 .channels
2418 .into_iter()
2419 .map(|channel| proto::Channel {
2420 id: channel.id.to_proto(),
2421 name: channel.name,
2422 })
2423 .collect(),
2424 insert_edge: channels_to_send.edges,
2425 ..Default::default()
2426 };
2427 for member_id in members {
2428 for connection_id in connection_pool.user_connection_ids(member_id) {
2429 session.peer.send(connection_id, update.clone())?;
2430 }
2431 }
2432
2433 response.send(Ack {})?;
2434
2435 Ok(())
2436}
2437
2438async fn unlink_channel(
2439 request: proto::UnlinkChannel,
2440 response: Response<proto::UnlinkChannel>,
2441 session: Session,
2442) -> Result<()> {
2443 let db = session.db().await;
2444 let channel_id = ChannelId::from_proto(request.channel_id);
2445 let from = ChannelId::from_proto(request.from);
2446
2447 db.unlink_channel(session.user_id, channel_id, from).await?;
2448
2449 let members = db.get_channel_members(from).await?;
2450
2451 let update = proto::UpdateChannels {
2452 delete_edge: vec![proto::ChannelEdge {
2453 channel_id: channel_id.to_proto(),
2454 parent_id: from.to_proto(),
2455 }],
2456 ..Default::default()
2457 };
2458 let connection_pool = session.connection_pool().await;
2459 for member_id in members {
2460 for connection_id in connection_pool.user_connection_ids(member_id) {
2461 session.peer.send(connection_id, update.clone())?;
2462 }
2463 }
2464
2465 response.send(Ack {})?;
2466
2467 Ok(())
2468}
2469
2470async fn move_channel(
2471 request: proto::MoveChannel,
2472 response: Response<proto::MoveChannel>,
2473 session: Session,
2474) -> Result<()> {
2475 let db = session.db().await;
2476 let channel_id = ChannelId::from_proto(request.channel_id);
2477 let from_parent = ChannelId::from_proto(request.from);
2478 let to = ChannelId::from_proto(request.to);
2479
2480 let channels_to_send = db
2481 .move_channel(session.user_id, channel_id, from_parent, to)
2482 .await?;
2483
2484 if channels_to_send.is_empty() {
2485 response.send(Ack {})?;
2486 return Ok(());
2487 }
2488
2489 let members_from = db.get_channel_members(from_parent).await?;
2490 let members_to = db.get_channel_members(to).await?;
2491
2492 let update = proto::UpdateChannels {
2493 delete_edge: vec![proto::ChannelEdge {
2494 channel_id: channel_id.to_proto(),
2495 parent_id: from_parent.to_proto(),
2496 }],
2497 ..Default::default()
2498 };
2499 let connection_pool = session.connection_pool().await;
2500 for member_id in members_from {
2501 for connection_id in connection_pool.user_connection_ids(member_id) {
2502 session.peer.send(connection_id, update.clone())?;
2503 }
2504 }
2505
2506 let update = proto::UpdateChannels {
2507 channels: channels_to_send
2508 .channels
2509 .into_iter()
2510 .map(|channel| proto::Channel {
2511 id: channel.id.to_proto(),
2512 name: channel.name,
2513 })
2514 .collect(),
2515 insert_edge: channels_to_send.edges,
2516 ..Default::default()
2517 };
2518 for member_id in members_to {
2519 for connection_id in connection_pool.user_connection_ids(member_id) {
2520 session.peer.send(connection_id, update.clone())?;
2521 }
2522 }
2523
2524 response.send(Ack {})?;
2525
2526 Ok(())
2527}
2528
2529async fn get_channel_members(
2530 request: proto::GetChannelMembers,
2531 response: Response<proto::GetChannelMembers>,
2532 session: Session,
2533) -> Result<()> {
2534 let db = session.db().await;
2535 let channel_id = ChannelId::from_proto(request.channel_id);
2536 let members = db
2537 .get_channel_member_details(channel_id, session.user_id)
2538 .await?;
2539 response.send(proto::GetChannelMembersResponse { members })?;
2540 Ok(())
2541}
2542
2543async fn respond_to_channel_invite(
2544 request: proto::RespondToChannelInvite,
2545 response: Response<proto::RespondToChannelInvite>,
2546 session: Session,
2547) -> Result<()> {
2548 let db = session.db().await;
2549 let channel_id = ChannelId::from_proto(request.channel_id);
2550 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2551 .await?;
2552
2553 let mut update = proto::UpdateChannels::default();
2554 update
2555 .remove_channel_invitations
2556 .push(channel_id.to_proto());
2557 if request.accept {
2558 let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2559 update
2560 .channels
2561 .extend(
2562 result
2563 .channels
2564 .channels
2565 .into_iter()
2566 .map(|channel| proto::Channel {
2567 id: channel.id.to_proto(),
2568 name: channel.name,
2569 }),
2570 );
2571 update.insert_edge = result.channels.edges;
2572 update
2573 .channel_participants
2574 .extend(
2575 result
2576 .channel_participants
2577 .into_iter()
2578 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2579 channel_id: channel_id.to_proto(),
2580 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2581 }),
2582 );
2583 update
2584 .channel_permissions
2585 .extend(
2586 result
2587 .channels_with_admin_privileges
2588 .into_iter()
2589 .map(|channel_id| proto::ChannelPermission {
2590 channel_id: channel_id.to_proto(),
2591 is_admin: true,
2592 }),
2593 );
2594 }
2595 session.peer.send(session.connection_id, update)?;
2596 response.send(proto::Ack {})?;
2597
2598 Ok(())
2599}
2600
2601async fn join_channel(
2602 request: proto::JoinChannel,
2603 response: Response<proto::JoinChannel>,
2604 session: Session,
2605) -> Result<()> {
2606 let channel_id = ChannelId::from_proto(request.channel_id);
2607
2608 let joined_room = {
2609 leave_room_for_session(&session).await?;
2610 let db = session.db().await;
2611
2612 let room_id = db.room_id_for_channel(channel_id).await?;
2613
2614 let joined_room = db
2615 .join_room(room_id, session.user_id, session.connection_id)
2616 .await?;
2617
2618 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2619 let token = live_kit
2620 .room_token(
2621 &joined_room.room.live_kit_room,
2622 &session.user_id.to_string(),
2623 )
2624 .trace_err()?;
2625
2626 Some(LiveKitConnectionInfo {
2627 server_url: live_kit.url().into(),
2628 token,
2629 })
2630 });
2631
2632 response.send(proto::JoinRoomResponse {
2633 room: Some(joined_room.room.clone()),
2634 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2635 live_kit_connection_info,
2636 })?;
2637
2638 room_updated(&joined_room.room, &session.peer);
2639
2640 joined_room.into_inner()
2641 };
2642
2643 channel_updated(
2644 channel_id,
2645 &joined_room.room,
2646 &joined_room.channel_members,
2647 &session.peer,
2648 &*session.connection_pool().await,
2649 );
2650
2651 update_user_contacts(session.user_id, &session).await?;
2652
2653 Ok(())
2654}
2655
2656async fn join_channel_buffer(
2657 request: proto::JoinChannelBuffer,
2658 response: Response<proto::JoinChannelBuffer>,
2659 session: Session,
2660) -> Result<()> {
2661 let db = session.db().await;
2662 let channel_id = ChannelId::from_proto(request.channel_id);
2663
2664 let open_response = db
2665 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2666 .await?;
2667
2668 let collaborators = open_response.collaborators.clone();
2669 response.send(open_response)?;
2670
2671 let update = UpdateChannelBufferCollaborators {
2672 channel_id: channel_id.to_proto(),
2673 collaborators: collaborators.clone(),
2674 };
2675 channel_buffer_updated(
2676 session.connection_id,
2677 collaborators
2678 .iter()
2679 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2680 &update,
2681 &session.peer,
2682 );
2683
2684 Ok(())
2685}
2686
2687async fn update_channel_buffer(
2688 request: proto::UpdateChannelBuffer,
2689 session: Session,
2690) -> Result<()> {
2691 let db = session.db().await;
2692 let channel_id = ChannelId::from_proto(request.channel_id);
2693
2694 let collaborators = db
2695 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2696 .await?;
2697
2698 channel_buffer_updated(
2699 session.connection_id,
2700 collaborators,
2701 &proto::UpdateChannelBuffer {
2702 channel_id: channel_id.to_proto(),
2703 operations: request.operations,
2704 },
2705 &session.peer,
2706 );
2707 Ok(())
2708}
2709
2710async fn rejoin_channel_buffers(
2711 request: proto::RejoinChannelBuffers,
2712 response: Response<proto::RejoinChannelBuffers>,
2713 session: Session,
2714) -> Result<()> {
2715 let db = session.db().await;
2716 let buffers = db
2717 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2718 .await?;
2719
2720 for rejoined_buffer in &buffers {
2721 let collaborators_to_notify = rejoined_buffer
2722 .buffer
2723 .collaborators
2724 .iter()
2725 .filter_map(|c| Some(c.peer_id?.into()));
2726 channel_buffer_updated(
2727 session.connection_id,
2728 collaborators_to_notify,
2729 &proto::UpdateChannelBufferCollaborators {
2730 channel_id: rejoined_buffer.buffer.channel_id,
2731 collaborators: rejoined_buffer.buffer.collaborators.clone(),
2732 },
2733 &session.peer,
2734 );
2735 }
2736
2737 response.send(proto::RejoinChannelBuffersResponse {
2738 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2739 })?;
2740
2741 Ok(())
2742}
2743
2744async fn leave_channel_buffer(
2745 request: proto::LeaveChannelBuffer,
2746 response: Response<proto::LeaveChannelBuffer>,
2747 session: Session,
2748) -> Result<()> {
2749 let db = session.db().await;
2750 let channel_id = ChannelId::from_proto(request.channel_id);
2751
2752 let left_buffer = db
2753 .leave_channel_buffer(channel_id, session.connection_id)
2754 .await?;
2755
2756 response.send(Ack {})?;
2757
2758 channel_buffer_updated(
2759 session.connection_id,
2760 left_buffer.connections,
2761 &proto::UpdateChannelBufferCollaborators {
2762 channel_id: channel_id.to_proto(),
2763 collaborators: left_buffer.collaborators,
2764 },
2765 &session.peer,
2766 );
2767
2768 Ok(())
2769}
2770
2771fn channel_buffer_updated<T: EnvelopedMessage>(
2772 sender_id: ConnectionId,
2773 collaborators: impl IntoIterator<Item = ConnectionId>,
2774 message: &T,
2775 peer: &Peer,
2776) {
2777 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2778 peer.send(peer_id.into(), message.clone())
2779 });
2780}
2781
2782async fn send_channel_message(
2783 request: proto::SendChannelMessage,
2784 response: Response<proto::SendChannelMessage>,
2785 session: Session,
2786) -> Result<()> {
2787 // Validate the message body.
2788 let body = request.body.trim().to_string();
2789 if body.len() > MAX_MESSAGE_LEN {
2790 return Err(anyhow!("message is too long"))?;
2791 }
2792 if body.is_empty() {
2793 return Err(anyhow!("message can't be blank"))?;
2794 }
2795
2796 let timestamp = OffsetDateTime::now_utc();
2797 let nonce = request
2798 .nonce
2799 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2800
2801 let channel_id = ChannelId::from_proto(request.channel_id);
2802 let (message_id, connection_ids) = session
2803 .db()
2804 .await
2805 .create_channel_message(
2806 channel_id,
2807 session.user_id,
2808 &body,
2809 timestamp,
2810 nonce.clone().into(),
2811 )
2812 .await?;
2813 let message = proto::ChannelMessage {
2814 sender_id: session.user_id.to_proto(),
2815 id: message_id.to_proto(),
2816 body,
2817 timestamp: timestamp.unix_timestamp() as u64,
2818 nonce: Some(nonce),
2819 };
2820 broadcast(Some(session.connection_id), connection_ids, |connection| {
2821 session.peer.send(
2822 connection,
2823 proto::ChannelMessageSent {
2824 channel_id: channel_id.to_proto(),
2825 message: Some(message.clone()),
2826 },
2827 )
2828 });
2829 response.send(proto::SendChannelMessageResponse {
2830 message: Some(message),
2831 })?;
2832 Ok(())
2833}
2834
2835async fn remove_channel_message(
2836 request: proto::RemoveChannelMessage,
2837 response: Response<proto::RemoveChannelMessage>,
2838 session: Session,
2839) -> Result<()> {
2840 let channel_id = ChannelId::from_proto(request.channel_id);
2841 let message_id = MessageId::from_proto(request.message_id);
2842 let connection_ids = session
2843 .db()
2844 .await
2845 .remove_channel_message(channel_id, message_id, session.user_id)
2846 .await?;
2847 broadcast(Some(session.connection_id), connection_ids, |connection| {
2848 session.peer.send(connection, request.clone())
2849 });
2850 response.send(proto::Ack {})?;
2851 Ok(())
2852}
2853
2854async fn join_channel_chat(
2855 request: proto::JoinChannelChat,
2856 response: Response<proto::JoinChannelChat>,
2857 session: Session,
2858) -> Result<()> {
2859 let channel_id = ChannelId::from_proto(request.channel_id);
2860
2861 let db = session.db().await;
2862 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2863 .await?;
2864 let messages = db
2865 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2866 .await?;
2867 response.send(proto::JoinChannelChatResponse {
2868 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2869 messages,
2870 })?;
2871 Ok(())
2872}
2873
2874async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2875 let channel_id = ChannelId::from_proto(request.channel_id);
2876 session
2877 .db()
2878 .await
2879 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2880 .await?;
2881 Ok(())
2882}
2883
2884async fn get_channel_messages(
2885 request: proto::GetChannelMessages,
2886 response: Response<proto::GetChannelMessages>,
2887 session: Session,
2888) -> Result<()> {
2889 let channel_id = ChannelId::from_proto(request.channel_id);
2890 let messages = session
2891 .db()
2892 .await
2893 .get_channel_messages(
2894 channel_id,
2895 session.user_id,
2896 MESSAGE_COUNT_PER_PAGE,
2897 Some(MessageId::from_proto(request.before_message_id)),
2898 )
2899 .await?;
2900 response.send(proto::GetChannelMessagesResponse {
2901 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2902 messages,
2903 })?;
2904 Ok(())
2905}
2906
2907async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2908 let project_id = ProjectId::from_proto(request.project_id);
2909 let project_connection_ids = session
2910 .db()
2911 .await
2912 .project_connection_ids(project_id, session.connection_id)
2913 .await?;
2914 broadcast(
2915 Some(session.connection_id),
2916 project_connection_ids.iter().copied(),
2917 |connection_id| {
2918 session
2919 .peer
2920 .forward_send(session.connection_id, connection_id, request.clone())
2921 },
2922 );
2923 Ok(())
2924}
2925
2926async fn get_private_user_info(
2927 _request: proto::GetPrivateUserInfo,
2928 response: Response<proto::GetPrivateUserInfo>,
2929 session: Session,
2930) -> Result<()> {
2931 let db = session.db().await;
2932
2933 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2934 let user = db
2935 .get_user_by_id(session.user_id)
2936 .await?
2937 .ok_or_else(|| anyhow!("user not found"))?;
2938 let flags = db.get_user_flags(session.user_id).await?;
2939
2940 response.send(proto::GetPrivateUserInfoResponse {
2941 metrics_id,
2942 staff: user.admin,
2943 flags,
2944 })?;
2945 Ok(())
2946}
2947
2948fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2949 match message {
2950 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2951 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2952 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2953 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2954 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2955 code: frame.code.into(),
2956 reason: frame.reason,
2957 })),
2958 }
2959}
2960
2961fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2962 match message {
2963 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2964 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2965 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2966 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2967 AxumMessage::Close(frame) => {
2968 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2969 code: frame.code.into(),
2970 reason: frame.reason,
2971 }))
2972 }
2973 }
2974}
2975
2976fn build_initial_channels_update(
2977 channels: ChannelsForUser,
2978 channel_invites: Vec<db::Channel>,
2979) -> proto::UpdateChannels {
2980 let mut update = proto::UpdateChannels::default();
2981
2982 for channel in channels.channels.channels {
2983 update.channels.push(proto::Channel {
2984 id: channel.id.to_proto(),
2985 name: channel.name,
2986 });
2987 }
2988
2989 update.insert_edge = channels.channels.edges;
2990
2991 for (channel_id, participants) in channels.channel_participants {
2992 update
2993 .channel_participants
2994 .push(proto::ChannelParticipants {
2995 channel_id: channel_id.to_proto(),
2996 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2997 });
2998 }
2999
3000 update
3001 .channel_permissions
3002 .extend(
3003 channels
3004 .channels_with_admin_privileges
3005 .into_iter()
3006 .map(|id| proto::ChannelPermission {
3007 channel_id: id.to_proto(),
3008 is_admin: true,
3009 }),
3010 );
3011
3012 for channel in channel_invites {
3013 update.channel_invitations.push(proto::Channel {
3014 id: channel.id.to_proto(),
3015 name: channel.name,
3016 });
3017 }
3018
3019 update
3020}
3021
3022fn build_initial_contacts_update(
3023 contacts: Vec<db::Contact>,
3024 pool: &ConnectionPool,
3025) -> proto::UpdateContacts {
3026 let mut update = proto::UpdateContacts::default();
3027
3028 for contact in contacts {
3029 match contact {
3030 db::Contact::Accepted {
3031 user_id,
3032 should_notify,
3033 busy,
3034 } => {
3035 update
3036 .contacts
3037 .push(contact_for_user(user_id, should_notify, busy, &pool));
3038 }
3039 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3040 db::Contact::Incoming {
3041 user_id,
3042 should_notify,
3043 } => update
3044 .incoming_requests
3045 .push(proto::IncomingContactRequest {
3046 requester_id: user_id.to_proto(),
3047 should_notify,
3048 }),
3049 }
3050 }
3051
3052 update
3053}
3054
3055fn contact_for_user(
3056 user_id: UserId,
3057 should_notify: bool,
3058 busy: bool,
3059 pool: &ConnectionPool,
3060) -> proto::Contact {
3061 proto::Contact {
3062 user_id: user_id.to_proto(),
3063 online: pool.is_user_online(user_id),
3064 busy,
3065 should_notify,
3066 }
3067}
3068
3069fn room_updated(room: &proto::Room, peer: &Peer) {
3070 broadcast(
3071 None,
3072 room.participants
3073 .iter()
3074 .filter_map(|participant| Some(participant.peer_id?.into())),
3075 |peer_id| {
3076 peer.send(
3077 peer_id.into(),
3078 proto::RoomUpdated {
3079 room: Some(room.clone()),
3080 },
3081 )
3082 },
3083 );
3084}
3085
3086fn channel_updated(
3087 channel_id: ChannelId,
3088 room: &proto::Room,
3089 channel_members: &[UserId],
3090 peer: &Peer,
3091 pool: &ConnectionPool,
3092) {
3093 let participants = room
3094 .participants
3095 .iter()
3096 .map(|p| p.user_id)
3097 .collect::<Vec<_>>();
3098
3099 broadcast(
3100 None,
3101 channel_members
3102 .iter()
3103 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3104 |peer_id| {
3105 peer.send(
3106 peer_id.into(),
3107 proto::UpdateChannels {
3108 channel_participants: vec![proto::ChannelParticipants {
3109 channel_id: channel_id.to_proto(),
3110 participant_user_ids: participants.clone(),
3111 }],
3112 ..Default::default()
3113 },
3114 )
3115 },
3116 );
3117}
3118
3119async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3120 let db = session.db().await;
3121
3122 let contacts = db.get_contacts(user_id).await?;
3123 let busy = db.is_user_busy(user_id).await?;
3124
3125 let pool = session.connection_pool().await;
3126 let updated_contact = contact_for_user(user_id, false, busy, &pool);
3127 for contact in contacts {
3128 if let db::Contact::Accepted {
3129 user_id: contact_user_id,
3130 ..
3131 } = contact
3132 {
3133 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3134 session
3135 .peer
3136 .send(
3137 contact_conn_id,
3138 proto::UpdateContacts {
3139 contacts: vec![updated_contact.clone()],
3140 remove_contacts: Default::default(),
3141 incoming_requests: Default::default(),
3142 remove_incoming_requests: Default::default(),
3143 outgoing_requests: Default::default(),
3144 remove_outgoing_requests: Default::default(),
3145 },
3146 )
3147 .trace_err();
3148 }
3149 }
3150 }
3151 Ok(())
3152}
3153
3154async fn leave_room_for_session(session: &Session) -> Result<()> {
3155 let mut contacts_to_update = HashSet::default();
3156
3157 let room_id;
3158 let canceled_calls_to_user_ids;
3159 let live_kit_room;
3160 let delete_live_kit_room;
3161 let room;
3162 let channel_members;
3163 let channel_id;
3164
3165 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3166 contacts_to_update.insert(session.user_id);
3167
3168 for project in left_room.left_projects.values() {
3169 project_left(project, session);
3170 }
3171
3172 room_id = RoomId::from_proto(left_room.room.id);
3173 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3174 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3175 delete_live_kit_room = left_room.deleted;
3176 room = mem::take(&mut left_room.room);
3177 channel_members = mem::take(&mut left_room.channel_members);
3178 channel_id = left_room.channel_id;
3179
3180 room_updated(&room, &session.peer);
3181 } else {
3182 return Ok(());
3183 }
3184
3185 if let Some(channel_id) = channel_id {
3186 channel_updated(
3187 channel_id,
3188 &room,
3189 &channel_members,
3190 &session.peer,
3191 &*session.connection_pool().await,
3192 );
3193 }
3194
3195 {
3196 let pool = session.connection_pool().await;
3197 for canceled_user_id in canceled_calls_to_user_ids {
3198 for connection_id in pool.user_connection_ids(canceled_user_id) {
3199 session
3200 .peer
3201 .send(
3202 connection_id,
3203 proto::CallCanceled {
3204 room_id: room_id.to_proto(),
3205 },
3206 )
3207 .trace_err();
3208 }
3209 contacts_to_update.insert(canceled_user_id);
3210 }
3211 }
3212
3213 for contact_user_id in contacts_to_update {
3214 update_user_contacts(contact_user_id, &session).await?;
3215 }
3216
3217 if let Some(live_kit) = session.live_kit_client.as_ref() {
3218 live_kit
3219 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3220 .await
3221 .trace_err();
3222
3223 if delete_live_kit_room {
3224 live_kit.delete_room(live_kit_room).await.trace_err();
3225 }
3226 }
3227
3228 Ok(())
3229}
3230
3231async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3232 let left_channel_buffers = session
3233 .db()
3234 .await
3235 .leave_channel_buffers(session.connection_id)
3236 .await?;
3237
3238 for left_buffer in left_channel_buffers {
3239 channel_buffer_updated(
3240 session.connection_id,
3241 left_buffer.connections,
3242 &proto::UpdateChannelBufferCollaborators {
3243 channel_id: left_buffer.channel_id.to_proto(),
3244 collaborators: left_buffer.collaborators,
3245 },
3246 &session.peer,
3247 );
3248 }
3249
3250 Ok(())
3251}
3252
3253fn project_left(project: &db::LeftProject, session: &Session) {
3254 for connection_id in &project.connection_ids {
3255 if project.host_user_id == session.user_id {
3256 session
3257 .peer
3258 .send(
3259 *connection_id,
3260 proto::UnshareProject {
3261 project_id: project.id.to_proto(),
3262 },
3263 )
3264 .trace_err();
3265 } else {
3266 session
3267 .peer
3268 .send(
3269 *connection_id,
3270 proto::RemoveProjectCollaborator {
3271 project_id: project.id.to_proto(),
3272 peer_id: Some(session.connection_id.into()),
3273 },
3274 )
3275 .trace_err();
3276 }
3277 }
3278}
3279
3280pub trait ResultExt {
3281 type Ok;
3282
3283 fn trace_err(self) -> Option<Self::Ok>;
3284}
3285
3286impl<T, E> ResultExt for Result<T, E>
3287where
3288 E: std::fmt::Debug,
3289{
3290 type Ok = T;
3291
3292 fn trace_err(self) -> Option<T> {
3293 match self {
3294 Ok(value) => Some(value),
3295 Err(error) => {
3296 tracing::error!("{:?}", error);
3297 None
3298 }
3299 }
3300 }
3301}