1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, ChannelEdge, EntityMessage,
42 EnvelopedMessage, LiveKitConnectionInfo, RequestMessage,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(delete_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(remove_channel_message)
269 .add_request_handler(get_channel_messages)
270 .add_request_handler(link_channel)
271 .add_request_handler(unlink_channel)
272 .add_request_handler(move_channel)
273 .add_request_handler(follow)
274 .add_message_handler(unfollow)
275 .add_message_handler(update_followers)
276 .add_message_handler(update_diff_base)
277 .add_request_handler(get_private_user_info);
278
279 Arc::new(server)
280 }
281
282 pub async fn start(&self) -> Result<()> {
283 let server_id = *self.id.lock();
284 let app_state = self.app_state.clone();
285 let peer = self.peer.clone();
286 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
287 let pool = self.connection_pool.clone();
288 let live_kit_client = self.app_state.live_kit_client.clone();
289
290 let span = info_span!("start server");
291 self.executor.spawn_detached(
292 async move {
293 tracing::info!("waiting for cleanup timeout");
294 timeout.await;
295 tracing::info!("cleanup timeout expired, retrieving stale rooms");
296 if let Some((room_ids, channel_ids)) = app_state
297 .db
298 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
299 .await
300 .trace_err()
301 {
302 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
303 tracing::info!(
304 stale_channel_buffer_count = channel_ids.len(),
305 "retrieved stale channel buffers"
306 );
307
308 for channel_id in channel_ids {
309 if let Some(refreshed_channel_buffer) = app_state
310 .db
311 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
312 .await
313 .trace_err()
314 {
315 for connection_id in refreshed_channel_buffer.connection_ids {
316 for message in &refreshed_channel_buffer.removed_collaborators {
317 peer.send(connection_id, message.clone()).trace_err();
318 }
319 }
320 }
321 }
322
323 for room_id in room_ids {
324 let mut contacts_to_update = HashSet::default();
325 let mut canceled_calls_to_user_ids = Vec::new();
326 let mut live_kit_room = String::new();
327 let mut delete_live_kit_room = false;
328
329 if let Some(mut refreshed_room) = app_state
330 .db
331 .clear_stale_room_participants(room_id, server_id)
332 .await
333 .trace_err()
334 {
335 tracing::info!(
336 room_id = room_id.0,
337 new_participant_count = refreshed_room.room.participants.len(),
338 "refreshed room"
339 );
340 room_updated(&refreshed_room.room, &peer);
341 if let Some(channel_id) = refreshed_room.channel_id {
342 channel_updated(
343 channel_id,
344 &refreshed_room.room,
345 &refreshed_room.channel_members,
346 &peer,
347 &*pool.lock(),
348 );
349 }
350 contacts_to_update
351 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
352 contacts_to_update
353 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
354 canceled_calls_to_user_ids =
355 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
356 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
357 delete_live_kit_room = refreshed_room.room.participants.is_empty();
358 }
359
360 {
361 let pool = pool.lock();
362 for canceled_user_id in canceled_calls_to_user_ids {
363 for connection_id in pool.user_connection_ids(canceled_user_id) {
364 peer.send(
365 connection_id,
366 proto::CallCanceled {
367 room_id: room_id.to_proto(),
368 },
369 )
370 .trace_err();
371 }
372 }
373 }
374
375 for user_id in contacts_to_update {
376 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
377 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
378 if let Some((busy, contacts)) = busy.zip(contacts) {
379 let pool = pool.lock();
380 let updated_contact = contact_for_user(user_id, false, busy, &pool);
381 for contact in contacts {
382 if let db::Contact::Accepted {
383 user_id: contact_user_id,
384 ..
385 } = contact
386 {
387 for contact_conn_id in
388 pool.user_connection_ids(contact_user_id)
389 {
390 peer.send(
391 contact_conn_id,
392 proto::UpdateContacts {
393 contacts: vec![updated_contact.clone()],
394 remove_contacts: Default::default(),
395 incoming_requests: Default::default(),
396 remove_incoming_requests: Default::default(),
397 outgoing_requests: Default::default(),
398 remove_outgoing_requests: Default::default(),
399 },
400 )
401 .trace_err();
402 }
403 }
404 }
405 }
406 }
407
408 if let Some(live_kit) = live_kit_client.as_ref() {
409 if delete_live_kit_room {
410 live_kit.delete_room(live_kit_room).await.trace_err();
411 }
412 }
413 }
414 }
415
416 app_state
417 .db
418 .delete_stale_servers(&app_state.config.zed_environment, server_id)
419 .await
420 .trace_err();
421 }
422 .instrument(span),
423 );
424 Ok(())
425 }
426
427 pub fn teardown(&self) {
428 self.peer.teardown();
429 self.connection_pool.lock().reset();
430 let _ = self.teardown.send(());
431 }
432
433 #[cfg(test)]
434 pub fn reset(&self, id: ServerId) {
435 self.teardown();
436 *self.id.lock() = id;
437 self.peer.reset(id.0 as u32);
438 }
439
440 #[cfg(test)]
441 pub fn id(&self) -> ServerId {
442 *self.id.lock()
443 }
444
445 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
446 where
447 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
448 Fut: 'static + Send + Future<Output = Result<()>>,
449 M: EnvelopedMessage,
450 {
451 let prev_handler = self.handlers.insert(
452 TypeId::of::<M>(),
453 Box::new(move |envelope, session| {
454 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
455 let span = info_span!(
456 "handle message",
457 payload_type = envelope.payload_type_name()
458 );
459 span.in_scope(|| {
460 tracing::info!(
461 payload_type = envelope.payload_type_name(),
462 "message received"
463 );
464 });
465 let start_time = Instant::now();
466 let future = (handler)(*envelope, session);
467 async move {
468 let result = future.await;
469 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
470 match result {
471 Err(error) => {
472 tracing::error!(%error, ?duration_ms, "error handling message")
473 }
474 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
475 }
476 }
477 .instrument(span)
478 .boxed()
479 }),
480 );
481 if prev_handler.is_some() {
482 panic!("registered a handler for the same message twice");
483 }
484 self
485 }
486
487 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
488 where
489 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
490 Fut: 'static + Send + Future<Output = Result<()>>,
491 M: EnvelopedMessage,
492 {
493 self.add_handler(move |envelope, session| handler(envelope.payload, session));
494 self
495 }
496
497 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
498 where
499 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
500 Fut: Send + Future<Output = Result<()>>,
501 M: RequestMessage,
502 {
503 let handler = Arc::new(handler);
504 self.add_handler(move |envelope, session| {
505 let receipt = envelope.receipt();
506 let handler = handler.clone();
507 async move {
508 let peer = session.peer.clone();
509 let responded = Arc::new(AtomicBool::default());
510 let response = Response {
511 peer: peer.clone(),
512 responded: responded.clone(),
513 receipt,
514 };
515 match (handler)(envelope.payload, response, session).await {
516 Ok(()) => {
517 if responded.load(std::sync::atomic::Ordering::SeqCst) {
518 Ok(())
519 } else {
520 Err(anyhow!("handler did not send a response"))?
521 }
522 }
523 Err(error) => {
524 peer.respond_with_error(
525 receipt,
526 proto::Error {
527 message: error.to_string(),
528 },
529 )?;
530 Err(error)
531 }
532 }
533 }
534 })
535 }
536
537 pub fn handle_connection(
538 self: &Arc<Self>,
539 connection: Connection,
540 address: String,
541 user: User,
542 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
543 executor: Executor,
544 ) -> impl Future<Output = Result<()>> {
545 let this = self.clone();
546 let user_id = user.id;
547 let login = user.github_login;
548 let span = info_span!("handle connection", %user_id, %login, %address);
549 let mut teardown = self.teardown.subscribe();
550 async move {
551 let (connection_id, handle_io, mut incoming_rx) = this
552 .peer
553 .add_connection(connection, {
554 let executor = executor.clone();
555 move |duration| executor.sleep(duration)
556 });
557
558 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
559 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
560 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
561
562 if let Some(send_connection_id) = send_connection_id.take() {
563 let _ = send_connection_id.send(connection_id);
564 }
565
566 if !user.connected_once {
567 this.peer.send(connection_id, proto::ShowContacts {})?;
568 this.app_state.db.set_user_connected_once(user_id, true).await?;
569 }
570
571 let (contacts, channels_for_user, channel_invites) = future::try_join3(
572 this.app_state.db.get_contacts(user_id),
573 this.app_state.db.get_channels_for_user(user_id),
574 this.app_state.db.get_channel_invites_for_user(user_id)
575 ).await?;
576
577 {
578 let mut pool = this.connection_pool.lock();
579 pool.add_connection(connection_id, user_id, user.admin);
580 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
581 this.peer.send(connection_id, build_initial_channels_update(
582 channels_for_user,
583 channel_invites
584 ))?;
585 }
586
587 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
588 this.peer.send(connection_id, incoming_call)?;
589 }
590
591 let session = Session {
592 user_id,
593 connection_id,
594 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
595 peer: this.peer.clone(),
596 connection_pool: this.connection_pool.clone(),
597 live_kit_client: this.app_state.live_kit_client.clone(),
598 executor: executor.clone(),
599 };
600 update_user_contacts(user_id, &session).await?;
601
602 let handle_io = handle_io.fuse();
603 futures::pin_mut!(handle_io);
604
605 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
606 // This prevents deadlocks when e.g., client A performs a request to client B and
607 // client B performs a request to client A. If both clients stop processing further
608 // messages until their respective request completes, they won't have a chance to
609 // respond to the other client's request and cause a deadlock.
610 //
611 // This arrangement ensures we will attempt to process earlier messages first, but fall
612 // back to processing messages arrived later in the spirit of making progress.
613 let mut foreground_message_handlers = FuturesUnordered::new();
614 let concurrent_handlers = Arc::new(Semaphore::new(256));
615 loop {
616 let next_message = async {
617 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
618 let message = incoming_rx.next().await;
619 (permit, message)
620 }.fuse();
621 futures::pin_mut!(next_message);
622 futures::select_biased! {
623 _ = teardown.changed().fuse() => return Ok(()),
624 result = handle_io => {
625 if let Err(error) = result {
626 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
627 }
628 break;
629 }
630 _ = foreground_message_handlers.next() => {}
631 next_message = next_message => {
632 let (permit, message) = next_message;
633 if let Some(message) = message {
634 let type_name = message.payload_type_name();
635 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
636 let span_enter = span.enter();
637 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
638 let is_background = message.is_background();
639 let handle_message = (handler)(message, session.clone());
640 drop(span_enter);
641
642 let handle_message = async move {
643 handle_message.await;
644 drop(permit);
645 }.instrument(span);
646 if is_background {
647 executor.spawn_detached(handle_message);
648 } else {
649 foreground_message_handlers.push(handle_message);
650 }
651 } else {
652 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
653 }
654 } else {
655 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
656 break;
657 }
658 }
659 }
660 }
661
662 drop(foreground_message_handlers);
663 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
664 if let Err(error) = connection_lost(session, teardown, executor).await {
665 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
666 }
667
668 Ok(())
669 }.instrument(span)
670 }
671
672 pub async fn invite_code_redeemed(
673 self: &Arc<Self>,
674 inviter_id: UserId,
675 invitee_id: UserId,
676 ) -> Result<()> {
677 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
678 if let Some(code) = &user.invite_code {
679 let pool = self.connection_pool.lock();
680 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
681 for connection_id in pool.user_connection_ids(inviter_id) {
682 self.peer.send(
683 connection_id,
684 proto::UpdateContacts {
685 contacts: vec![invitee_contact.clone()],
686 ..Default::default()
687 },
688 )?;
689 self.peer.send(
690 connection_id,
691 proto::UpdateInviteInfo {
692 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
693 count: user.invite_count as u32,
694 },
695 )?;
696 }
697 }
698 }
699 Ok(())
700 }
701
702 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
703 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
704 if let Some(invite_code) = &user.invite_code {
705 let pool = self.connection_pool.lock();
706 for connection_id in pool.user_connection_ids(user_id) {
707 self.peer.send(
708 connection_id,
709 proto::UpdateInviteInfo {
710 url: format!(
711 "{}{}",
712 self.app_state.config.invite_link_prefix, invite_code
713 ),
714 count: user.invite_count as u32,
715 },
716 )?;
717 }
718 }
719 }
720 Ok(())
721 }
722
723 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
724 ServerSnapshot {
725 connection_pool: ConnectionPoolGuard {
726 guard: self.connection_pool.lock(),
727 _not_send: PhantomData,
728 },
729 peer: &self.peer,
730 }
731 }
732}
733
734impl<'a> Deref for ConnectionPoolGuard<'a> {
735 type Target = ConnectionPool;
736
737 fn deref(&self) -> &Self::Target {
738 &*self.guard
739 }
740}
741
742impl<'a> DerefMut for ConnectionPoolGuard<'a> {
743 fn deref_mut(&mut self) -> &mut Self::Target {
744 &mut *self.guard
745 }
746}
747
748impl<'a> Drop for ConnectionPoolGuard<'a> {
749 fn drop(&mut self) {
750 #[cfg(test)]
751 self.check_invariants();
752 }
753}
754
755fn broadcast<F>(
756 sender_id: Option<ConnectionId>,
757 receiver_ids: impl IntoIterator<Item = ConnectionId>,
758 mut f: F,
759) where
760 F: FnMut(ConnectionId) -> anyhow::Result<()>,
761{
762 for receiver_id in receiver_ids {
763 if Some(receiver_id) != sender_id {
764 if let Err(error) = f(receiver_id) {
765 tracing::error!("failed to send to {:?} {}", receiver_id, error);
766 }
767 }
768 }
769}
770
771lazy_static! {
772 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
773}
774
775pub struct ProtocolVersion(u32);
776
777impl Header for ProtocolVersion {
778 fn name() -> &'static HeaderName {
779 &ZED_PROTOCOL_VERSION
780 }
781
782 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
783 where
784 Self: Sized,
785 I: Iterator<Item = &'i axum::http::HeaderValue>,
786 {
787 let version = values
788 .next()
789 .ok_or_else(axum::headers::Error::invalid)?
790 .to_str()
791 .map_err(|_| axum::headers::Error::invalid())?
792 .parse()
793 .map_err(|_| axum::headers::Error::invalid())?;
794 Ok(Self(version))
795 }
796
797 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
798 values.extend([self.0.to_string().parse().unwrap()]);
799 }
800}
801
802pub fn routes(server: Arc<Server>) -> Router<Body> {
803 Router::new()
804 .route("/rpc", get(handle_websocket_request))
805 .layer(
806 ServiceBuilder::new()
807 .layer(Extension(server.app_state.clone()))
808 .layer(middleware::from_fn(auth::validate_header)),
809 )
810 .route("/metrics", get(handle_metrics))
811 .layer(Extension(server))
812}
813
814pub async fn handle_websocket_request(
815 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
816 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
817 Extension(server): Extension<Arc<Server>>,
818 Extension(user): Extension<User>,
819 ws: WebSocketUpgrade,
820) -> axum::response::Response {
821 if protocol_version != rpc::PROTOCOL_VERSION {
822 return (
823 StatusCode::UPGRADE_REQUIRED,
824 "client must be upgraded".to_string(),
825 )
826 .into_response();
827 }
828 let socket_address = socket_address.to_string();
829 ws.on_upgrade(move |socket| {
830 use util::ResultExt;
831 let socket = socket
832 .map_ok(to_tungstenite_message)
833 .err_into()
834 .with(|message| async move { Ok(to_axum_message(message)) });
835 let connection = Connection::new(Box::pin(socket));
836 async move {
837 server
838 .handle_connection(connection, socket_address, user, None, Executor::Production)
839 .await
840 .log_err();
841 }
842 })
843}
844
845pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
846 let connections = server
847 .connection_pool
848 .lock()
849 .connections()
850 .filter(|connection| !connection.admin)
851 .count();
852
853 METRIC_CONNECTIONS.set(connections as _);
854
855 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
856 METRIC_SHARED_PROJECTS.set(shared_projects as _);
857
858 let encoder = prometheus::TextEncoder::new();
859 let metric_families = prometheus::gather();
860 let encoded_metrics = encoder
861 .encode_to_string(&metric_families)
862 .map_err(|err| anyhow!("{}", err))?;
863 Ok(encoded_metrics)
864}
865
866#[instrument(err, skip(executor))]
867async fn connection_lost(
868 session: Session,
869 mut teardown: watch::Receiver<()>,
870 executor: Executor,
871) -> Result<()> {
872 session.peer.disconnect(session.connection_id);
873 session
874 .connection_pool()
875 .await
876 .remove_connection(session.connection_id)?;
877
878 session
879 .db()
880 .await
881 .connection_lost(session.connection_id)
882 .await
883 .trace_err();
884
885 futures::select_biased! {
886 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
887 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
888 leave_room_for_session(&session).await.trace_err();
889 leave_channel_buffers_for_session(&session)
890 .await
891 .trace_err();
892
893 if !session
894 .connection_pool()
895 .await
896 .is_user_online(session.user_id)
897 {
898 let db = session.db().await;
899 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
900 room_updated(&room, &session.peer);
901 }
902 }
903
904 update_user_contacts(session.user_id, &session).await?;
905 }
906 _ = teardown.changed().fuse() => {}
907 }
908
909 Ok(())
910}
911
912async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
913 response.send(proto::Ack {})?;
914 Ok(())
915}
916
917async fn create_room(
918 _request: proto::CreateRoom,
919 response: Response<proto::CreateRoom>,
920 session: Session,
921) -> Result<()> {
922 let live_kit_room = nanoid::nanoid!(30);
923
924 let live_kit_connection_info = {
925 let live_kit_room = live_kit_room.clone();
926 let live_kit = session.live_kit_client.as_ref();
927
928 util::async_iife!({
929 let live_kit = live_kit?;
930
931 live_kit
932 .create_room(live_kit_room.clone())
933 .await
934 .trace_err()?;
935
936 let token = live_kit
937 .room_token(&live_kit_room, &session.user_id.to_string())
938 .trace_err()?;
939
940 Some(proto::LiveKitConnectionInfo {
941 server_url: live_kit.url().into(),
942 token,
943 })
944 })
945 }
946 .await;
947
948 let room = session
949 .db()
950 .await
951 .create_room(session.user_id, session.connection_id, &live_kit_room)
952 .await?;
953
954 response.send(proto::CreateRoomResponse {
955 room: Some(room.clone()),
956 live_kit_connection_info,
957 })?;
958
959 update_user_contacts(session.user_id, &session).await?;
960 Ok(())
961}
962
963async fn join_room(
964 request: proto::JoinRoom,
965 response: Response<proto::JoinRoom>,
966 session: Session,
967) -> Result<()> {
968 let room_id = RoomId::from_proto(request.id);
969 let joined_room = {
970 let room = session
971 .db()
972 .await
973 .join_room(room_id, session.user_id, session.connection_id)
974 .await?;
975 room_updated(&room.room, &session.peer);
976 room.into_inner()
977 };
978
979 if let Some(channel_id) = joined_room.channel_id {
980 channel_updated(
981 channel_id,
982 &joined_room.room,
983 &joined_room.channel_members,
984 &session.peer,
985 &*session.connection_pool().await,
986 )
987 }
988
989 for connection_id in session
990 .connection_pool()
991 .await
992 .user_connection_ids(session.user_id)
993 {
994 session
995 .peer
996 .send(
997 connection_id,
998 proto::CallCanceled {
999 room_id: room_id.to_proto(),
1000 },
1001 )
1002 .trace_err();
1003 }
1004
1005 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1006 if let Some(token) = live_kit
1007 .room_token(
1008 &joined_room.room.live_kit_room,
1009 &session.user_id.to_string(),
1010 )
1011 .trace_err()
1012 {
1013 Some(proto::LiveKitConnectionInfo {
1014 server_url: live_kit.url().into(),
1015 token,
1016 })
1017 } else {
1018 None
1019 }
1020 } else {
1021 None
1022 };
1023
1024 response.send(proto::JoinRoomResponse {
1025 room: Some(joined_room.room),
1026 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1027 live_kit_connection_info,
1028 })?;
1029
1030 update_user_contacts(session.user_id, &session).await?;
1031 Ok(())
1032}
1033
1034async fn rejoin_room(
1035 request: proto::RejoinRoom,
1036 response: Response<proto::RejoinRoom>,
1037 session: Session,
1038) -> Result<()> {
1039 let room;
1040 let channel_id;
1041 let channel_members;
1042 {
1043 let mut rejoined_room = session
1044 .db()
1045 .await
1046 .rejoin_room(request, session.user_id, session.connection_id)
1047 .await?;
1048
1049 response.send(proto::RejoinRoomResponse {
1050 room: Some(rejoined_room.room.clone()),
1051 reshared_projects: rejoined_room
1052 .reshared_projects
1053 .iter()
1054 .map(|project| proto::ResharedProject {
1055 id: project.id.to_proto(),
1056 collaborators: project
1057 .collaborators
1058 .iter()
1059 .map(|collaborator| collaborator.to_proto())
1060 .collect(),
1061 })
1062 .collect(),
1063 rejoined_projects: rejoined_room
1064 .rejoined_projects
1065 .iter()
1066 .map(|rejoined_project| proto::RejoinedProject {
1067 id: rejoined_project.id.to_proto(),
1068 worktrees: rejoined_project
1069 .worktrees
1070 .iter()
1071 .map(|worktree| proto::WorktreeMetadata {
1072 id: worktree.id,
1073 root_name: worktree.root_name.clone(),
1074 visible: worktree.visible,
1075 abs_path: worktree.abs_path.clone(),
1076 })
1077 .collect(),
1078 collaborators: rejoined_project
1079 .collaborators
1080 .iter()
1081 .map(|collaborator| collaborator.to_proto())
1082 .collect(),
1083 language_servers: rejoined_project.language_servers.clone(),
1084 })
1085 .collect(),
1086 })?;
1087 room_updated(&rejoined_room.room, &session.peer);
1088
1089 for project in &rejoined_room.reshared_projects {
1090 for collaborator in &project.collaborators {
1091 session
1092 .peer
1093 .send(
1094 collaborator.connection_id,
1095 proto::UpdateProjectCollaborator {
1096 project_id: project.id.to_proto(),
1097 old_peer_id: Some(project.old_connection_id.into()),
1098 new_peer_id: Some(session.connection_id.into()),
1099 },
1100 )
1101 .trace_err();
1102 }
1103
1104 broadcast(
1105 Some(session.connection_id),
1106 project
1107 .collaborators
1108 .iter()
1109 .map(|collaborator| collaborator.connection_id),
1110 |connection_id| {
1111 session.peer.forward_send(
1112 session.connection_id,
1113 connection_id,
1114 proto::UpdateProject {
1115 project_id: project.id.to_proto(),
1116 worktrees: project.worktrees.clone(),
1117 },
1118 )
1119 },
1120 );
1121 }
1122
1123 for project in &rejoined_room.rejoined_projects {
1124 for collaborator in &project.collaborators {
1125 session
1126 .peer
1127 .send(
1128 collaborator.connection_id,
1129 proto::UpdateProjectCollaborator {
1130 project_id: project.id.to_proto(),
1131 old_peer_id: Some(project.old_connection_id.into()),
1132 new_peer_id: Some(session.connection_id.into()),
1133 },
1134 )
1135 .trace_err();
1136 }
1137 }
1138
1139 for project in &mut rejoined_room.rejoined_projects {
1140 for worktree in mem::take(&mut project.worktrees) {
1141 #[cfg(any(test, feature = "test-support"))]
1142 const MAX_CHUNK_SIZE: usize = 2;
1143 #[cfg(not(any(test, feature = "test-support")))]
1144 const MAX_CHUNK_SIZE: usize = 256;
1145
1146 // Stream this worktree's entries.
1147 let message = proto::UpdateWorktree {
1148 project_id: project.id.to_proto(),
1149 worktree_id: worktree.id,
1150 abs_path: worktree.abs_path.clone(),
1151 root_name: worktree.root_name,
1152 updated_entries: worktree.updated_entries,
1153 removed_entries: worktree.removed_entries,
1154 scan_id: worktree.scan_id,
1155 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1156 updated_repositories: worktree.updated_repositories,
1157 removed_repositories: worktree.removed_repositories,
1158 };
1159 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1160 session.peer.send(session.connection_id, update.clone())?;
1161 }
1162
1163 // Stream this worktree's diagnostics.
1164 for summary in worktree.diagnostic_summaries {
1165 session.peer.send(
1166 session.connection_id,
1167 proto::UpdateDiagnosticSummary {
1168 project_id: project.id.to_proto(),
1169 worktree_id: worktree.id,
1170 summary: Some(summary),
1171 },
1172 )?;
1173 }
1174
1175 for settings_file in worktree.settings_files {
1176 session.peer.send(
1177 session.connection_id,
1178 proto::UpdateWorktreeSettings {
1179 project_id: project.id.to_proto(),
1180 worktree_id: worktree.id,
1181 path: settings_file.path,
1182 content: Some(settings_file.content),
1183 },
1184 )?;
1185 }
1186 }
1187
1188 for language_server in &project.language_servers {
1189 session.peer.send(
1190 session.connection_id,
1191 proto::UpdateLanguageServer {
1192 project_id: project.id.to_proto(),
1193 language_server_id: language_server.id,
1194 variant: Some(
1195 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1196 proto::LspDiskBasedDiagnosticsUpdated {},
1197 ),
1198 ),
1199 },
1200 )?;
1201 }
1202 }
1203
1204 let rejoined_room = rejoined_room.into_inner();
1205
1206 room = rejoined_room.room;
1207 channel_id = rejoined_room.channel_id;
1208 channel_members = rejoined_room.channel_members;
1209 }
1210
1211 if let Some(channel_id) = channel_id {
1212 channel_updated(
1213 channel_id,
1214 &room,
1215 &channel_members,
1216 &session.peer,
1217 &*session.connection_pool().await,
1218 );
1219 }
1220
1221 update_user_contacts(session.user_id, &session).await?;
1222 Ok(())
1223}
1224
1225async fn leave_room(
1226 _: proto::LeaveRoom,
1227 response: Response<proto::LeaveRoom>,
1228 session: Session,
1229) -> Result<()> {
1230 leave_room_for_session(&session).await?;
1231 response.send(proto::Ack {})?;
1232 Ok(())
1233}
1234
1235async fn call(
1236 request: proto::Call,
1237 response: Response<proto::Call>,
1238 session: Session,
1239) -> Result<()> {
1240 let room_id = RoomId::from_proto(request.room_id);
1241 let calling_user_id = session.user_id;
1242 let calling_connection_id = session.connection_id;
1243 let called_user_id = UserId::from_proto(request.called_user_id);
1244 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1245 if !session
1246 .db()
1247 .await
1248 .has_contact(calling_user_id, called_user_id)
1249 .await?
1250 {
1251 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1252 }
1253
1254 let incoming_call = {
1255 let (room, incoming_call) = &mut *session
1256 .db()
1257 .await
1258 .call(
1259 room_id,
1260 calling_user_id,
1261 calling_connection_id,
1262 called_user_id,
1263 initial_project_id,
1264 )
1265 .await?;
1266 room_updated(&room, &session.peer);
1267 mem::take(incoming_call)
1268 };
1269 update_user_contacts(called_user_id, &session).await?;
1270
1271 let mut calls = session
1272 .connection_pool()
1273 .await
1274 .user_connection_ids(called_user_id)
1275 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1276 .collect::<FuturesUnordered<_>>();
1277
1278 while let Some(call_response) = calls.next().await {
1279 match call_response.as_ref() {
1280 Ok(_) => {
1281 response.send(proto::Ack {})?;
1282 return Ok(());
1283 }
1284 Err(_) => {
1285 call_response.trace_err();
1286 }
1287 }
1288 }
1289
1290 {
1291 let room = session
1292 .db()
1293 .await
1294 .call_failed(room_id, called_user_id)
1295 .await?;
1296 room_updated(&room, &session.peer);
1297 }
1298 update_user_contacts(called_user_id, &session).await?;
1299
1300 Err(anyhow!("failed to ring user"))?
1301}
1302
1303async fn cancel_call(
1304 request: proto::CancelCall,
1305 response: Response<proto::CancelCall>,
1306 session: Session,
1307) -> Result<()> {
1308 let called_user_id = UserId::from_proto(request.called_user_id);
1309 let room_id = RoomId::from_proto(request.room_id);
1310 {
1311 let room = session
1312 .db()
1313 .await
1314 .cancel_call(room_id, session.connection_id, called_user_id)
1315 .await?;
1316 room_updated(&room, &session.peer);
1317 }
1318
1319 for connection_id in session
1320 .connection_pool()
1321 .await
1322 .user_connection_ids(called_user_id)
1323 {
1324 session
1325 .peer
1326 .send(
1327 connection_id,
1328 proto::CallCanceled {
1329 room_id: room_id.to_proto(),
1330 },
1331 )
1332 .trace_err();
1333 }
1334 response.send(proto::Ack {})?;
1335
1336 update_user_contacts(called_user_id, &session).await?;
1337 Ok(())
1338}
1339
1340async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1341 let room_id = RoomId::from_proto(message.room_id);
1342 {
1343 let room = session
1344 .db()
1345 .await
1346 .decline_call(Some(room_id), session.user_id)
1347 .await?
1348 .ok_or_else(|| anyhow!("failed to decline call"))?;
1349 room_updated(&room, &session.peer);
1350 }
1351
1352 for connection_id in session
1353 .connection_pool()
1354 .await
1355 .user_connection_ids(session.user_id)
1356 {
1357 session
1358 .peer
1359 .send(
1360 connection_id,
1361 proto::CallCanceled {
1362 room_id: room_id.to_proto(),
1363 },
1364 )
1365 .trace_err();
1366 }
1367 update_user_contacts(session.user_id, &session).await?;
1368 Ok(())
1369}
1370
1371async fn update_participant_location(
1372 request: proto::UpdateParticipantLocation,
1373 response: Response<proto::UpdateParticipantLocation>,
1374 session: Session,
1375) -> Result<()> {
1376 let room_id = RoomId::from_proto(request.room_id);
1377 let location = request
1378 .location
1379 .ok_or_else(|| anyhow!("invalid location"))?;
1380
1381 let db = session.db().await;
1382 let room = db
1383 .update_room_participant_location(room_id, session.connection_id, location)
1384 .await?;
1385
1386 room_updated(&room, &session.peer);
1387 response.send(proto::Ack {})?;
1388 Ok(())
1389}
1390
1391async fn share_project(
1392 request: proto::ShareProject,
1393 response: Response<proto::ShareProject>,
1394 session: Session,
1395) -> Result<()> {
1396 let (project_id, room) = &*session
1397 .db()
1398 .await
1399 .share_project(
1400 RoomId::from_proto(request.room_id),
1401 session.connection_id,
1402 &request.worktrees,
1403 )
1404 .await?;
1405 response.send(proto::ShareProjectResponse {
1406 project_id: project_id.to_proto(),
1407 })?;
1408 room_updated(&room, &session.peer);
1409
1410 Ok(())
1411}
1412
1413async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1414 let project_id = ProjectId::from_proto(message.project_id);
1415
1416 let (room, guest_connection_ids) = &*session
1417 .db()
1418 .await
1419 .unshare_project(project_id, session.connection_id)
1420 .await?;
1421
1422 broadcast(
1423 Some(session.connection_id),
1424 guest_connection_ids.iter().copied(),
1425 |conn_id| session.peer.send(conn_id, message.clone()),
1426 );
1427 room_updated(&room, &session.peer);
1428
1429 Ok(())
1430}
1431
1432async fn join_project(
1433 request: proto::JoinProject,
1434 response: Response<proto::JoinProject>,
1435 session: Session,
1436) -> Result<()> {
1437 let project_id = ProjectId::from_proto(request.project_id);
1438 let guest_user_id = session.user_id;
1439
1440 tracing::info!(%project_id, "join project");
1441
1442 let (project, replica_id) = &mut *session
1443 .db()
1444 .await
1445 .join_project(project_id, session.connection_id)
1446 .await?;
1447
1448 let collaborators = project
1449 .collaborators
1450 .iter()
1451 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1452 .map(|collaborator| collaborator.to_proto())
1453 .collect::<Vec<_>>();
1454
1455 let worktrees = project
1456 .worktrees
1457 .iter()
1458 .map(|(id, worktree)| proto::WorktreeMetadata {
1459 id: *id,
1460 root_name: worktree.root_name.clone(),
1461 visible: worktree.visible,
1462 abs_path: worktree.abs_path.clone(),
1463 })
1464 .collect::<Vec<_>>();
1465
1466 for collaborator in &collaborators {
1467 session
1468 .peer
1469 .send(
1470 collaborator.peer_id.unwrap().into(),
1471 proto::AddProjectCollaborator {
1472 project_id: project_id.to_proto(),
1473 collaborator: Some(proto::Collaborator {
1474 peer_id: Some(session.connection_id.into()),
1475 replica_id: replica_id.0 as u32,
1476 user_id: guest_user_id.to_proto(),
1477 }),
1478 },
1479 )
1480 .trace_err();
1481 }
1482
1483 // First, we send the metadata associated with each worktree.
1484 response.send(proto::JoinProjectResponse {
1485 worktrees: worktrees.clone(),
1486 replica_id: replica_id.0 as u32,
1487 collaborators: collaborators.clone(),
1488 language_servers: project.language_servers.clone(),
1489 })?;
1490
1491 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1492 #[cfg(any(test, feature = "test-support"))]
1493 const MAX_CHUNK_SIZE: usize = 2;
1494 #[cfg(not(any(test, feature = "test-support")))]
1495 const MAX_CHUNK_SIZE: usize = 256;
1496
1497 // Stream this worktree's entries.
1498 let message = proto::UpdateWorktree {
1499 project_id: project_id.to_proto(),
1500 worktree_id,
1501 abs_path: worktree.abs_path.clone(),
1502 root_name: worktree.root_name,
1503 updated_entries: worktree.entries,
1504 removed_entries: Default::default(),
1505 scan_id: worktree.scan_id,
1506 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1507 updated_repositories: worktree.repository_entries.into_values().collect(),
1508 removed_repositories: Default::default(),
1509 };
1510 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1511 session.peer.send(session.connection_id, update.clone())?;
1512 }
1513
1514 // Stream this worktree's diagnostics.
1515 for summary in worktree.diagnostic_summaries {
1516 session.peer.send(
1517 session.connection_id,
1518 proto::UpdateDiagnosticSummary {
1519 project_id: project_id.to_proto(),
1520 worktree_id: worktree.id,
1521 summary: Some(summary),
1522 },
1523 )?;
1524 }
1525
1526 for settings_file in worktree.settings_files {
1527 session.peer.send(
1528 session.connection_id,
1529 proto::UpdateWorktreeSettings {
1530 project_id: project_id.to_proto(),
1531 worktree_id: worktree.id,
1532 path: settings_file.path,
1533 content: Some(settings_file.content),
1534 },
1535 )?;
1536 }
1537 }
1538
1539 for language_server in &project.language_servers {
1540 session.peer.send(
1541 session.connection_id,
1542 proto::UpdateLanguageServer {
1543 project_id: project_id.to_proto(),
1544 language_server_id: language_server.id,
1545 variant: Some(
1546 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1547 proto::LspDiskBasedDiagnosticsUpdated {},
1548 ),
1549 ),
1550 },
1551 )?;
1552 }
1553
1554 Ok(())
1555}
1556
1557async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1558 let sender_id = session.connection_id;
1559 let project_id = ProjectId::from_proto(request.project_id);
1560
1561 let (room, project) = &*session
1562 .db()
1563 .await
1564 .leave_project(project_id, sender_id)
1565 .await?;
1566 tracing::info!(
1567 %project_id,
1568 host_user_id = %project.host_user_id,
1569 host_connection_id = %project.host_connection_id,
1570 "leave project"
1571 );
1572
1573 project_left(&project, &session);
1574 room_updated(&room, &session.peer);
1575
1576 Ok(())
1577}
1578
1579async fn update_project(
1580 request: proto::UpdateProject,
1581 response: Response<proto::UpdateProject>,
1582 session: Session,
1583) -> Result<()> {
1584 let project_id = ProjectId::from_proto(request.project_id);
1585 let (room, guest_connection_ids) = &*session
1586 .db()
1587 .await
1588 .update_project(project_id, session.connection_id, &request.worktrees)
1589 .await?;
1590 broadcast(
1591 Some(session.connection_id),
1592 guest_connection_ids.iter().copied(),
1593 |connection_id| {
1594 session
1595 .peer
1596 .forward_send(session.connection_id, connection_id, request.clone())
1597 },
1598 );
1599 room_updated(&room, &session.peer);
1600 response.send(proto::Ack {})?;
1601
1602 Ok(())
1603}
1604
1605async fn update_worktree(
1606 request: proto::UpdateWorktree,
1607 response: Response<proto::UpdateWorktree>,
1608 session: Session,
1609) -> Result<()> {
1610 let guest_connection_ids = session
1611 .db()
1612 .await
1613 .update_worktree(&request, session.connection_id)
1614 .await?;
1615
1616 broadcast(
1617 Some(session.connection_id),
1618 guest_connection_ids.iter().copied(),
1619 |connection_id| {
1620 session
1621 .peer
1622 .forward_send(session.connection_id, connection_id, request.clone())
1623 },
1624 );
1625 response.send(proto::Ack {})?;
1626 Ok(())
1627}
1628
1629async fn update_diagnostic_summary(
1630 message: proto::UpdateDiagnosticSummary,
1631 session: Session,
1632) -> Result<()> {
1633 let guest_connection_ids = session
1634 .db()
1635 .await
1636 .update_diagnostic_summary(&message, session.connection_id)
1637 .await?;
1638
1639 broadcast(
1640 Some(session.connection_id),
1641 guest_connection_ids.iter().copied(),
1642 |connection_id| {
1643 session
1644 .peer
1645 .forward_send(session.connection_id, connection_id, message.clone())
1646 },
1647 );
1648
1649 Ok(())
1650}
1651
1652async fn update_worktree_settings(
1653 message: proto::UpdateWorktreeSettings,
1654 session: Session,
1655) -> Result<()> {
1656 let guest_connection_ids = session
1657 .db()
1658 .await
1659 .update_worktree_settings(&message, session.connection_id)
1660 .await?;
1661
1662 broadcast(
1663 Some(session.connection_id),
1664 guest_connection_ids.iter().copied(),
1665 |connection_id| {
1666 session
1667 .peer
1668 .forward_send(session.connection_id, connection_id, message.clone())
1669 },
1670 );
1671
1672 Ok(())
1673}
1674
1675async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1676 broadcast_project_message(request.project_id, request, session).await
1677}
1678
1679async fn start_language_server(
1680 request: proto::StartLanguageServer,
1681 session: Session,
1682) -> Result<()> {
1683 let guest_connection_ids = session
1684 .db()
1685 .await
1686 .start_language_server(&request, session.connection_id)
1687 .await?;
1688
1689 broadcast(
1690 Some(session.connection_id),
1691 guest_connection_ids.iter().copied(),
1692 |connection_id| {
1693 session
1694 .peer
1695 .forward_send(session.connection_id, connection_id, request.clone())
1696 },
1697 );
1698 Ok(())
1699}
1700
1701async fn update_language_server(
1702 request: proto::UpdateLanguageServer,
1703 session: Session,
1704) -> Result<()> {
1705 session.executor.record_backtrace();
1706 let project_id = ProjectId::from_proto(request.project_id);
1707 let project_connection_ids = session
1708 .db()
1709 .await
1710 .project_connection_ids(project_id, session.connection_id)
1711 .await?;
1712 broadcast(
1713 Some(session.connection_id),
1714 project_connection_ids.iter().copied(),
1715 |connection_id| {
1716 session
1717 .peer
1718 .forward_send(session.connection_id, connection_id, request.clone())
1719 },
1720 );
1721 Ok(())
1722}
1723
1724async fn forward_project_request<T>(
1725 request: T,
1726 response: Response<T>,
1727 session: Session,
1728) -> Result<()>
1729where
1730 T: EntityMessage + RequestMessage,
1731{
1732 session.executor.record_backtrace();
1733 let project_id = ProjectId::from_proto(request.remote_entity_id());
1734 let host_connection_id = {
1735 let collaborators = session
1736 .db()
1737 .await
1738 .project_collaborators(project_id, session.connection_id)
1739 .await?;
1740 collaborators
1741 .iter()
1742 .find(|collaborator| collaborator.is_host)
1743 .ok_or_else(|| anyhow!("host not found"))?
1744 .connection_id
1745 };
1746
1747 let payload = session
1748 .peer
1749 .forward_request(session.connection_id, host_connection_id, request)
1750 .await?;
1751
1752 response.send(payload)?;
1753 Ok(())
1754}
1755
1756async fn create_buffer_for_peer(
1757 request: proto::CreateBufferForPeer,
1758 session: Session,
1759) -> Result<()> {
1760 session.executor.record_backtrace();
1761 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1762 session
1763 .peer
1764 .forward_send(session.connection_id, peer_id.into(), request)?;
1765 Ok(())
1766}
1767
1768async fn update_buffer(
1769 request: proto::UpdateBuffer,
1770 response: Response<proto::UpdateBuffer>,
1771 session: Session,
1772) -> Result<()> {
1773 session.executor.record_backtrace();
1774 let project_id = ProjectId::from_proto(request.project_id);
1775 let mut guest_connection_ids;
1776 let mut host_connection_id = None;
1777 {
1778 let collaborators = session
1779 .db()
1780 .await
1781 .project_collaborators(project_id, session.connection_id)
1782 .await?;
1783 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1784 for collaborator in collaborators.iter() {
1785 if collaborator.is_host {
1786 host_connection_id = Some(collaborator.connection_id);
1787 } else {
1788 guest_connection_ids.push(collaborator.connection_id);
1789 }
1790 }
1791 }
1792 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1793
1794 session.executor.record_backtrace();
1795 broadcast(
1796 Some(session.connection_id),
1797 guest_connection_ids,
1798 |connection_id| {
1799 session
1800 .peer
1801 .forward_send(session.connection_id, connection_id, request.clone())
1802 },
1803 );
1804 if host_connection_id != session.connection_id {
1805 session
1806 .peer
1807 .forward_request(session.connection_id, host_connection_id, request.clone())
1808 .await?;
1809 }
1810
1811 response.send(proto::Ack {})?;
1812 Ok(())
1813}
1814
1815async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1816 let project_id = ProjectId::from_proto(request.project_id);
1817 let project_connection_ids = session
1818 .db()
1819 .await
1820 .project_connection_ids(project_id, session.connection_id)
1821 .await?;
1822
1823 broadcast(
1824 Some(session.connection_id),
1825 project_connection_ids.iter().copied(),
1826 |connection_id| {
1827 session
1828 .peer
1829 .forward_send(session.connection_id, connection_id, request.clone())
1830 },
1831 );
1832 Ok(())
1833}
1834
1835async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1836 let project_id = ProjectId::from_proto(request.project_id);
1837 let project_connection_ids = session
1838 .db()
1839 .await
1840 .project_connection_ids(project_id, session.connection_id)
1841 .await?;
1842 broadcast(
1843 Some(session.connection_id),
1844 project_connection_ids.iter().copied(),
1845 |connection_id| {
1846 session
1847 .peer
1848 .forward_send(session.connection_id, connection_id, request.clone())
1849 },
1850 );
1851 Ok(())
1852}
1853
1854async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1855 broadcast_project_message(request.project_id, request, session).await
1856}
1857
1858async fn broadcast_project_message<T: EnvelopedMessage>(
1859 project_id: u64,
1860 request: T,
1861 session: Session,
1862) -> Result<()> {
1863 let project_id = ProjectId::from_proto(project_id);
1864 let project_connection_ids = session
1865 .db()
1866 .await
1867 .project_connection_ids(project_id, session.connection_id)
1868 .await?;
1869 broadcast(
1870 Some(session.connection_id),
1871 project_connection_ids.iter().copied(),
1872 |connection_id| {
1873 session
1874 .peer
1875 .forward_send(session.connection_id, connection_id, request.clone())
1876 },
1877 );
1878 Ok(())
1879}
1880
1881async fn follow(
1882 request: proto::Follow,
1883 response: Response<proto::Follow>,
1884 session: Session,
1885) -> Result<()> {
1886 let project_id = ProjectId::from_proto(request.project_id);
1887 let leader_id = request
1888 .leader_id
1889 .ok_or_else(|| anyhow!("invalid leader id"))?
1890 .into();
1891 let follower_id = session.connection_id;
1892
1893 {
1894 let project_connection_ids = session
1895 .db()
1896 .await
1897 .project_connection_ids(project_id, session.connection_id)
1898 .await?;
1899
1900 if !project_connection_ids.contains(&leader_id) {
1901 Err(anyhow!("no such peer"))?;
1902 }
1903 }
1904
1905 let mut response_payload = session
1906 .peer
1907 .forward_request(session.connection_id, leader_id, request)
1908 .await?;
1909 response_payload
1910 .views
1911 .retain(|view| view.leader_id != Some(follower_id.into()));
1912 response.send(response_payload)?;
1913
1914 let room = session
1915 .db()
1916 .await
1917 .follow(project_id, leader_id, follower_id)
1918 .await?;
1919 room_updated(&room, &session.peer);
1920
1921 Ok(())
1922}
1923
1924async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1925 let project_id = ProjectId::from_proto(request.project_id);
1926 let leader_id = request
1927 .leader_id
1928 .ok_or_else(|| anyhow!("invalid leader id"))?
1929 .into();
1930 let follower_id = session.connection_id;
1931
1932 if !session
1933 .db()
1934 .await
1935 .project_connection_ids(project_id, session.connection_id)
1936 .await?
1937 .contains(&leader_id)
1938 {
1939 Err(anyhow!("no such peer"))?;
1940 }
1941
1942 session
1943 .peer
1944 .forward_send(session.connection_id, leader_id, request)?;
1945
1946 let room = session
1947 .db()
1948 .await
1949 .unfollow(project_id, leader_id, follower_id)
1950 .await?;
1951 room_updated(&room, &session.peer);
1952
1953 Ok(())
1954}
1955
1956async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1957 let project_id = ProjectId::from_proto(request.project_id);
1958 let project_connection_ids = session
1959 .db
1960 .lock()
1961 .await
1962 .project_connection_ids(project_id, session.connection_id)
1963 .await?;
1964
1965 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1966 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1967 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1968 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1969 });
1970 for follower_peer_id in request.follower_ids.iter().copied() {
1971 let follower_connection_id = follower_peer_id.into();
1972 if project_connection_ids.contains(&follower_connection_id)
1973 && Some(follower_peer_id) != leader_id
1974 {
1975 session.peer.forward_send(
1976 session.connection_id,
1977 follower_connection_id,
1978 request.clone(),
1979 )?;
1980 }
1981 }
1982 Ok(())
1983}
1984
1985async fn get_users(
1986 request: proto::GetUsers,
1987 response: Response<proto::GetUsers>,
1988 session: Session,
1989) -> Result<()> {
1990 let user_ids = request
1991 .user_ids
1992 .into_iter()
1993 .map(UserId::from_proto)
1994 .collect();
1995 let users = session
1996 .db()
1997 .await
1998 .get_users_by_ids(user_ids)
1999 .await?
2000 .into_iter()
2001 .map(|user| proto::User {
2002 id: user.id.to_proto(),
2003 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2004 github_login: user.github_login,
2005 })
2006 .collect();
2007 response.send(proto::UsersResponse { users })?;
2008 Ok(())
2009}
2010
2011async fn fuzzy_search_users(
2012 request: proto::FuzzySearchUsers,
2013 response: Response<proto::FuzzySearchUsers>,
2014 session: Session,
2015) -> Result<()> {
2016 let query = request.query;
2017 let users = match query.len() {
2018 0 => vec![],
2019 1 | 2 => session
2020 .db()
2021 .await
2022 .get_user_by_github_login(&query)
2023 .await?
2024 .into_iter()
2025 .collect(),
2026 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2027 };
2028 let users = users
2029 .into_iter()
2030 .filter(|user| user.id != session.user_id)
2031 .map(|user| proto::User {
2032 id: user.id.to_proto(),
2033 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2034 github_login: user.github_login,
2035 })
2036 .collect();
2037 response.send(proto::UsersResponse { users })?;
2038 Ok(())
2039}
2040
2041async fn request_contact(
2042 request: proto::RequestContact,
2043 response: Response<proto::RequestContact>,
2044 session: Session,
2045) -> Result<()> {
2046 let requester_id = session.user_id;
2047 let responder_id = UserId::from_proto(request.responder_id);
2048 if requester_id == responder_id {
2049 return Err(anyhow!("cannot add yourself as a contact"))?;
2050 }
2051
2052 session
2053 .db()
2054 .await
2055 .send_contact_request(requester_id, responder_id)
2056 .await?;
2057
2058 // Update outgoing contact requests of requester
2059 let mut update = proto::UpdateContacts::default();
2060 update.outgoing_requests.push(responder_id.to_proto());
2061 for connection_id in session
2062 .connection_pool()
2063 .await
2064 .user_connection_ids(requester_id)
2065 {
2066 session.peer.send(connection_id, update.clone())?;
2067 }
2068
2069 // Update incoming contact requests of responder
2070 let mut update = proto::UpdateContacts::default();
2071 update
2072 .incoming_requests
2073 .push(proto::IncomingContactRequest {
2074 requester_id: requester_id.to_proto(),
2075 should_notify: true,
2076 });
2077 for connection_id in session
2078 .connection_pool()
2079 .await
2080 .user_connection_ids(responder_id)
2081 {
2082 session.peer.send(connection_id, update.clone())?;
2083 }
2084
2085 response.send(proto::Ack {})?;
2086 Ok(())
2087}
2088
2089async fn respond_to_contact_request(
2090 request: proto::RespondToContactRequest,
2091 response: Response<proto::RespondToContactRequest>,
2092 session: Session,
2093) -> Result<()> {
2094 let responder_id = session.user_id;
2095 let requester_id = UserId::from_proto(request.requester_id);
2096 let db = session.db().await;
2097 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2098 db.dismiss_contact_notification(responder_id, requester_id)
2099 .await?;
2100 } else {
2101 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2102
2103 db.respond_to_contact_request(responder_id, requester_id, accept)
2104 .await?;
2105 let requester_busy = db.is_user_busy(requester_id).await?;
2106 let responder_busy = db.is_user_busy(responder_id).await?;
2107
2108 let pool = session.connection_pool().await;
2109 // Update responder with new contact
2110 let mut update = proto::UpdateContacts::default();
2111 if accept {
2112 update
2113 .contacts
2114 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2115 }
2116 update
2117 .remove_incoming_requests
2118 .push(requester_id.to_proto());
2119 for connection_id in pool.user_connection_ids(responder_id) {
2120 session.peer.send(connection_id, update.clone())?;
2121 }
2122
2123 // Update requester with new contact
2124 let mut update = proto::UpdateContacts::default();
2125 if accept {
2126 update
2127 .contacts
2128 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2129 }
2130 update
2131 .remove_outgoing_requests
2132 .push(responder_id.to_proto());
2133 for connection_id in pool.user_connection_ids(requester_id) {
2134 session.peer.send(connection_id, update.clone())?;
2135 }
2136 }
2137
2138 response.send(proto::Ack {})?;
2139 Ok(())
2140}
2141
2142async fn remove_contact(
2143 request: proto::RemoveContact,
2144 response: Response<proto::RemoveContact>,
2145 session: Session,
2146) -> Result<()> {
2147 let requester_id = session.user_id;
2148 let responder_id = UserId::from_proto(request.user_id);
2149 let db = session.db().await;
2150 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2151
2152 let pool = session.connection_pool().await;
2153 // Update outgoing contact requests of requester
2154 let mut update = proto::UpdateContacts::default();
2155 if contact_accepted {
2156 update.remove_contacts.push(responder_id.to_proto());
2157 } else {
2158 update
2159 .remove_outgoing_requests
2160 .push(responder_id.to_proto());
2161 }
2162 for connection_id in pool.user_connection_ids(requester_id) {
2163 session.peer.send(connection_id, update.clone())?;
2164 }
2165
2166 // Update incoming contact requests of responder
2167 let mut update = proto::UpdateContacts::default();
2168 if contact_accepted {
2169 update.remove_contacts.push(requester_id.to_proto());
2170 } else {
2171 update
2172 .remove_incoming_requests
2173 .push(requester_id.to_proto());
2174 }
2175 for connection_id in pool.user_connection_ids(responder_id) {
2176 session.peer.send(connection_id, update.clone())?;
2177 }
2178
2179 response.send(proto::Ack {})?;
2180 Ok(())
2181}
2182
2183async fn create_channel(
2184 request: proto::CreateChannel,
2185 response: Response<proto::CreateChannel>,
2186 session: Session,
2187) -> Result<()> {
2188 let db = session.db().await;
2189 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2190
2191 if let Some(live_kit) = session.live_kit_client.as_ref() {
2192 live_kit.create_room(live_kit_room.clone()).await?;
2193 }
2194
2195 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2196 let id = db
2197 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2198 .await?;
2199
2200 let channel = proto::Channel {
2201 id: id.to_proto(),
2202 name: request.name,
2203 };
2204
2205 response.send(proto::CreateChannelResponse {
2206 channel: Some(channel.clone()),
2207 parent_id: request.parent_id,
2208 })?;
2209
2210 let Some(parent_id) = parent_id else {
2211 return Ok(());
2212 };
2213
2214 let update = proto::UpdateChannels {
2215 channels: vec![channel],
2216 insert_edge: vec![ChannelEdge {
2217 parent_id: parent_id.to_proto(),
2218 channel_id: id.to_proto(),
2219 }],
2220 ..Default::default()
2221 };
2222
2223 let user_ids_to_notify = db.get_channel_members(parent_id).await?;
2224
2225 let connection_pool = session.connection_pool().await;
2226 for user_id in user_ids_to_notify {
2227 for connection_id in connection_pool.user_connection_ids(user_id) {
2228 if user_id == session.user_id {
2229 continue;
2230 }
2231 session.peer.send(connection_id, update.clone())?;
2232 }
2233 }
2234
2235 Ok(())
2236}
2237
2238async fn delete_channel(
2239 request: proto::DeleteChannel,
2240 response: Response<proto::DeleteChannel>,
2241 session: Session,
2242) -> Result<()> {
2243 let db = session.db().await;
2244
2245 let channel_id = request.channel_id;
2246 let (removed_channels, member_ids) = db
2247 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2248 .await?;
2249 response.send(proto::Ack {})?;
2250
2251 // Notify members of removed channels
2252 let mut update = proto::UpdateChannels::default();
2253 update
2254 .delete_channels
2255 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2256
2257 let connection_pool = session.connection_pool().await;
2258 for member_id in member_ids {
2259 for connection_id in connection_pool.user_connection_ids(member_id) {
2260 session.peer.send(connection_id, update.clone())?;
2261 }
2262 }
2263
2264 Ok(())
2265}
2266
2267async fn invite_channel_member(
2268 request: proto::InviteChannelMember,
2269 response: Response<proto::InviteChannelMember>,
2270 session: Session,
2271) -> Result<()> {
2272 let db = session.db().await;
2273 let channel_id = ChannelId::from_proto(request.channel_id);
2274 let invitee_id = UserId::from_proto(request.user_id);
2275 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2276 .await?;
2277
2278 let (channel, _) = db
2279 .get_channel(channel_id, session.user_id)
2280 .await?
2281 .ok_or_else(|| anyhow!("channel not found"))?;
2282
2283 let mut update = proto::UpdateChannels::default();
2284 update.channel_invitations.push(proto::Channel {
2285 id: channel.id.to_proto(),
2286 name: channel.name,
2287 });
2288 for connection_id in session
2289 .connection_pool()
2290 .await
2291 .user_connection_ids(invitee_id)
2292 {
2293 session.peer.send(connection_id, update.clone())?;
2294 }
2295
2296 response.send(proto::Ack {})?;
2297 Ok(())
2298}
2299
2300async fn remove_channel_member(
2301 request: proto::RemoveChannelMember,
2302 response: Response<proto::RemoveChannelMember>,
2303 session: Session,
2304) -> Result<()> {
2305 let db = session.db().await;
2306 let channel_id = ChannelId::from_proto(request.channel_id);
2307 let member_id = UserId::from_proto(request.user_id);
2308
2309 db.remove_channel_member(channel_id, member_id, session.user_id)
2310 .await?;
2311
2312 let mut update = proto::UpdateChannels::default();
2313 update.delete_channels.push(channel_id.to_proto());
2314
2315 for connection_id in session
2316 .connection_pool()
2317 .await
2318 .user_connection_ids(member_id)
2319 {
2320 session.peer.send(connection_id, update.clone())?;
2321 }
2322
2323 response.send(proto::Ack {})?;
2324 Ok(())
2325}
2326
2327async fn set_channel_member_admin(
2328 request: proto::SetChannelMemberAdmin,
2329 response: Response<proto::SetChannelMemberAdmin>,
2330 session: Session,
2331) -> Result<()> {
2332 let db = session.db().await;
2333 let channel_id = ChannelId::from_proto(request.channel_id);
2334 let member_id = UserId::from_proto(request.user_id);
2335 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2336 .await?;
2337
2338 let (channel, has_accepted) = db
2339 .get_channel(channel_id, member_id)
2340 .await?
2341 .ok_or_else(|| anyhow!("channel not found"))?;
2342
2343 let mut update = proto::UpdateChannels::default();
2344 if has_accepted {
2345 update.channel_permissions.push(proto::ChannelPermission {
2346 channel_id: channel.id.to_proto(),
2347 is_admin: request.admin,
2348 });
2349 }
2350
2351 for connection_id in session
2352 .connection_pool()
2353 .await
2354 .user_connection_ids(member_id)
2355 {
2356 session.peer.send(connection_id, update.clone())?;
2357 }
2358
2359 response.send(proto::Ack {})?;
2360 Ok(())
2361}
2362
2363async fn rename_channel(
2364 request: proto::RenameChannel,
2365 response: Response<proto::RenameChannel>,
2366 session: Session,
2367) -> Result<()> {
2368 let db = session.db().await;
2369 let channel_id = ChannelId::from_proto(request.channel_id);
2370 let new_name = db
2371 .rename_channel(channel_id, session.user_id, &request.name)
2372 .await?;
2373
2374 let channel = proto::Channel {
2375 id: request.channel_id,
2376 name: new_name,
2377 };
2378 response.send(proto::RenameChannelResponse {
2379 channel: Some(channel.clone()),
2380 })?;
2381 let mut update = proto::UpdateChannels::default();
2382 update.channels.push(channel);
2383
2384 let member_ids = db.get_channel_members(channel_id).await?;
2385
2386 let connection_pool = session.connection_pool().await;
2387 for member_id in member_ids {
2388 for connection_id in connection_pool.user_connection_ids(member_id) {
2389 session.peer.send(connection_id, update.clone())?;
2390 }
2391 }
2392
2393 Ok(())
2394}
2395
2396async fn link_channel(
2397 request: proto::LinkChannel,
2398 response: Response<proto::LinkChannel>,
2399 session: Session,
2400) -> Result<()> {
2401 let db = session.db().await;
2402 let channel_id = ChannelId::from_proto(request.channel_id);
2403 let to = ChannelId::from_proto(request.to);
2404 let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
2405
2406 let members = db.get_channel_members(to).await?;
2407 let connection_pool = session.connection_pool().await;
2408 let update = proto::UpdateChannels {
2409 channels: channels_to_send
2410 .channels
2411 .into_iter()
2412 .map(|channel| proto::Channel {
2413 id: channel.id.to_proto(),
2414 name: channel.name,
2415 })
2416 .collect(),
2417 insert_edge: channels_to_send.edges,
2418 ..Default::default()
2419 };
2420 for member_id in members {
2421 for connection_id in connection_pool.user_connection_ids(member_id) {
2422 session.peer.send(connection_id, update.clone())?;
2423 }
2424 }
2425
2426 response.send(Ack {})?;
2427
2428 Ok(())
2429}
2430
2431async fn unlink_channel(
2432 request: proto::UnlinkChannel,
2433 response: Response<proto::UnlinkChannel>,
2434 session: Session,
2435) -> Result<()> {
2436 let db = session.db().await;
2437 let channel_id = ChannelId::from_proto(request.channel_id);
2438 let from = ChannelId::from_proto(request.from);
2439
2440 db.unlink_channel(session.user_id, channel_id, from).await?;
2441
2442 let members = db.get_channel_members(from).await?;
2443
2444 let update = proto::UpdateChannels {
2445 delete_edge: vec![proto::ChannelEdge {
2446 channel_id: channel_id.to_proto(),
2447 parent_id: from.to_proto(),
2448 }],
2449 ..Default::default()
2450 };
2451 let connection_pool = session.connection_pool().await;
2452 for member_id in members {
2453 for connection_id in connection_pool.user_connection_ids(member_id) {
2454 session.peer.send(connection_id, update.clone())?;
2455 }
2456 }
2457
2458 response.send(Ack {})?;
2459
2460 Ok(())
2461}
2462
2463async fn move_channel(
2464 request: proto::MoveChannel,
2465 response: Response<proto::MoveChannel>,
2466 session: Session,
2467) -> Result<()> {
2468 let db = session.db().await;
2469 let channel_id = ChannelId::from_proto(request.channel_id);
2470 let from_parent = ChannelId::from_proto(request.from);
2471 let to = ChannelId::from_proto(request.to);
2472
2473 let channels_to_send = db
2474 .move_channel(session.user_id, channel_id, from_parent, to)
2475 .await?;
2476
2477 if channels_to_send.is_empty() {
2478 response.send(Ack {})?;
2479 return Ok(());
2480 }
2481
2482 let members_from = db.get_channel_members(from_parent).await?;
2483 let members_to = db.get_channel_members(to).await?;
2484
2485 let update = proto::UpdateChannels {
2486 delete_edge: vec![proto::ChannelEdge {
2487 channel_id: channel_id.to_proto(),
2488 parent_id: from_parent.to_proto(),
2489 }],
2490 ..Default::default()
2491 };
2492 let connection_pool = session.connection_pool().await;
2493 for member_id in members_from {
2494 for connection_id in connection_pool.user_connection_ids(member_id) {
2495 session.peer.send(connection_id, update.clone())?;
2496 }
2497 }
2498
2499 let update = proto::UpdateChannels {
2500 channels: channels_to_send
2501 .channels
2502 .into_iter()
2503 .map(|channel| proto::Channel {
2504 id: channel.id.to_proto(),
2505 name: channel.name,
2506 })
2507 .collect(),
2508 insert_edge: channels_to_send.edges,
2509 ..Default::default()
2510 };
2511 for member_id in members_to {
2512 for connection_id in connection_pool.user_connection_ids(member_id) {
2513 session.peer.send(connection_id, update.clone())?;
2514 }
2515 }
2516
2517 response.send(Ack {})?;
2518
2519 Ok(())
2520}
2521
2522async fn get_channel_members(
2523 request: proto::GetChannelMembers,
2524 response: Response<proto::GetChannelMembers>,
2525 session: Session,
2526) -> Result<()> {
2527 let db = session.db().await;
2528 let channel_id = ChannelId::from_proto(request.channel_id);
2529 let members = db
2530 .get_channel_member_details(channel_id, session.user_id)
2531 .await?;
2532 response.send(proto::GetChannelMembersResponse { members })?;
2533 Ok(())
2534}
2535
2536async fn respond_to_channel_invite(
2537 request: proto::RespondToChannelInvite,
2538 response: Response<proto::RespondToChannelInvite>,
2539 session: Session,
2540) -> Result<()> {
2541 let db = session.db().await;
2542 let channel_id = ChannelId::from_proto(request.channel_id);
2543 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2544 .await?;
2545
2546 let mut update = proto::UpdateChannels::default();
2547 update
2548 .remove_channel_invitations
2549 .push(channel_id.to_proto());
2550 if request.accept {
2551 let result = db.get_channel_for_user(channel_id, session.user_id).await?;
2552 update
2553 .channels
2554 .extend(
2555 result
2556 .channels
2557 .channels
2558 .into_iter()
2559 .map(|channel| proto::Channel {
2560 id: channel.id.to_proto(),
2561 name: channel.name,
2562 }),
2563 );
2564 update.insert_edge = result.channels.edges;
2565 update
2566 .channel_participants
2567 .extend(
2568 result
2569 .channel_participants
2570 .into_iter()
2571 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2572 channel_id: channel_id.to_proto(),
2573 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2574 }),
2575 );
2576 update
2577 .channel_permissions
2578 .extend(
2579 result
2580 .channels_with_admin_privileges
2581 .into_iter()
2582 .map(|channel_id| proto::ChannelPermission {
2583 channel_id: channel_id.to_proto(),
2584 is_admin: true,
2585 }),
2586 );
2587 }
2588 session.peer.send(session.connection_id, update)?;
2589 response.send(proto::Ack {})?;
2590
2591 Ok(())
2592}
2593
2594async fn join_channel(
2595 request: proto::JoinChannel,
2596 response: Response<proto::JoinChannel>,
2597 session: Session,
2598) -> Result<()> {
2599 let channel_id = ChannelId::from_proto(request.channel_id);
2600
2601 let joined_room = {
2602 leave_room_for_session(&session).await?;
2603 let db = session.db().await;
2604
2605 let room_id = db.room_id_for_channel(channel_id).await?;
2606
2607 let joined_room = db
2608 .join_room(room_id, session.user_id, session.connection_id)
2609 .await?;
2610
2611 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2612 let token = live_kit
2613 .room_token(
2614 &joined_room.room.live_kit_room,
2615 &session.user_id.to_string(),
2616 )
2617 .trace_err()?;
2618
2619 Some(LiveKitConnectionInfo {
2620 server_url: live_kit.url().into(),
2621 token,
2622 })
2623 });
2624
2625 response.send(proto::JoinRoomResponse {
2626 room: Some(joined_room.room.clone()),
2627 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2628 live_kit_connection_info,
2629 })?;
2630
2631 room_updated(&joined_room.room, &session.peer);
2632
2633 joined_room.into_inner()
2634 };
2635
2636 channel_updated(
2637 channel_id,
2638 &joined_room.room,
2639 &joined_room.channel_members,
2640 &session.peer,
2641 &*session.connection_pool().await,
2642 );
2643
2644 update_user_contacts(session.user_id, &session).await?;
2645
2646 Ok(())
2647}
2648
2649async fn join_channel_buffer(
2650 request: proto::JoinChannelBuffer,
2651 response: Response<proto::JoinChannelBuffer>,
2652 session: Session,
2653) -> Result<()> {
2654 let db = session.db().await;
2655 let channel_id = ChannelId::from_proto(request.channel_id);
2656
2657 let open_response = db
2658 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2659 .await?;
2660
2661 let replica_id = open_response.replica_id;
2662 let collaborators = open_response.collaborators.clone();
2663
2664 response.send(open_response)?;
2665
2666 let update = AddChannelBufferCollaborator {
2667 channel_id: channel_id.to_proto(),
2668 collaborator: Some(proto::Collaborator {
2669 user_id: session.user_id.to_proto(),
2670 peer_id: Some(session.connection_id.into()),
2671 replica_id,
2672 }),
2673 };
2674 channel_buffer_updated(
2675 session.connection_id,
2676 collaborators
2677 .iter()
2678 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2679 &update,
2680 &session.peer,
2681 );
2682
2683 Ok(())
2684}
2685
2686async fn update_channel_buffer(
2687 request: proto::UpdateChannelBuffer,
2688 session: Session,
2689) -> Result<()> {
2690 let db = session.db().await;
2691 let channel_id = ChannelId::from_proto(request.channel_id);
2692
2693 let collaborators = db
2694 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2695 .await?;
2696
2697 channel_buffer_updated(
2698 session.connection_id,
2699 collaborators,
2700 &proto::UpdateChannelBuffer {
2701 channel_id: channel_id.to_proto(),
2702 operations: request.operations,
2703 },
2704 &session.peer,
2705 );
2706 Ok(())
2707}
2708
2709async fn rejoin_channel_buffers(
2710 request: proto::RejoinChannelBuffers,
2711 response: Response<proto::RejoinChannelBuffers>,
2712 session: Session,
2713) -> Result<()> {
2714 let db = session.db().await;
2715 let buffers = db
2716 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2717 .await?;
2718
2719 for buffer in &buffers {
2720 let collaborators_to_notify = buffer
2721 .buffer
2722 .collaborators
2723 .iter()
2724 .filter_map(|c| Some(c.peer_id?.into()));
2725 channel_buffer_updated(
2726 session.connection_id,
2727 collaborators_to_notify,
2728 &proto::UpdateChannelBufferCollaborator {
2729 channel_id: buffer.buffer.channel_id,
2730 old_peer_id: Some(buffer.old_connection_id.into()),
2731 new_peer_id: Some(session.connection_id.into()),
2732 },
2733 &session.peer,
2734 );
2735 }
2736
2737 response.send(proto::RejoinChannelBuffersResponse {
2738 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2739 })?;
2740
2741 Ok(())
2742}
2743
2744async fn leave_channel_buffer(
2745 request: proto::LeaveChannelBuffer,
2746 response: Response<proto::LeaveChannelBuffer>,
2747 session: Session,
2748) -> Result<()> {
2749 let db = session.db().await;
2750 let channel_id = ChannelId::from_proto(request.channel_id);
2751
2752 let collaborators_to_notify = db
2753 .leave_channel_buffer(channel_id, session.connection_id)
2754 .await?;
2755
2756 response.send(Ack {})?;
2757
2758 channel_buffer_updated(
2759 session.connection_id,
2760 collaborators_to_notify,
2761 &proto::RemoveChannelBufferCollaborator {
2762 channel_id: channel_id.to_proto(),
2763 peer_id: Some(session.connection_id.into()),
2764 },
2765 &session.peer,
2766 );
2767
2768 Ok(())
2769}
2770
2771fn channel_buffer_updated<T: EnvelopedMessage>(
2772 sender_id: ConnectionId,
2773 collaborators: impl IntoIterator<Item = ConnectionId>,
2774 message: &T,
2775 peer: &Peer,
2776) {
2777 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2778 peer.send(peer_id.into(), message.clone())
2779 });
2780}
2781
2782async fn send_channel_message(
2783 request: proto::SendChannelMessage,
2784 response: Response<proto::SendChannelMessage>,
2785 session: Session,
2786) -> Result<()> {
2787 // Validate the message body.
2788 let body = request.body.trim().to_string();
2789 if body.len() > MAX_MESSAGE_LEN {
2790 return Err(anyhow!("message is too long"))?;
2791 }
2792 if body.is_empty() {
2793 return Err(anyhow!("message can't be blank"))?;
2794 }
2795
2796 let timestamp = OffsetDateTime::now_utc();
2797 let nonce = request
2798 .nonce
2799 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2800
2801 let channel_id = ChannelId::from_proto(request.channel_id);
2802 let (message_id, connection_ids) = session
2803 .db()
2804 .await
2805 .create_channel_message(
2806 channel_id,
2807 session.user_id,
2808 &body,
2809 timestamp,
2810 nonce.clone().into(),
2811 )
2812 .await?;
2813 let message = proto::ChannelMessage {
2814 sender_id: session.user_id.to_proto(),
2815 id: message_id.to_proto(),
2816 body,
2817 timestamp: timestamp.unix_timestamp() as u64,
2818 nonce: Some(nonce),
2819 };
2820 broadcast(Some(session.connection_id), connection_ids, |connection| {
2821 session.peer.send(
2822 connection,
2823 proto::ChannelMessageSent {
2824 channel_id: channel_id.to_proto(),
2825 message: Some(message.clone()),
2826 },
2827 )
2828 });
2829 response.send(proto::SendChannelMessageResponse {
2830 message: Some(message),
2831 })?;
2832 Ok(())
2833}
2834
2835async fn remove_channel_message(
2836 request: proto::RemoveChannelMessage,
2837 response: Response<proto::RemoveChannelMessage>,
2838 session: Session,
2839) -> Result<()> {
2840 let channel_id = ChannelId::from_proto(request.channel_id);
2841 let message_id = MessageId::from_proto(request.message_id);
2842 let connection_ids = session
2843 .db()
2844 .await
2845 .remove_channel_message(channel_id, message_id, session.user_id)
2846 .await?;
2847 broadcast(Some(session.connection_id), connection_ids, |connection| {
2848 session.peer.send(connection, request.clone())
2849 });
2850 response.send(proto::Ack {})?;
2851 Ok(())
2852}
2853
2854async fn join_channel_chat(
2855 request: proto::JoinChannelChat,
2856 response: Response<proto::JoinChannelChat>,
2857 session: Session,
2858) -> Result<()> {
2859 let channel_id = ChannelId::from_proto(request.channel_id);
2860
2861 let db = session.db().await;
2862 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2863 .await?;
2864 let messages = db
2865 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2866 .await?;
2867 response.send(proto::JoinChannelChatResponse {
2868 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2869 messages,
2870 })?;
2871 Ok(())
2872}
2873
2874async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2875 let channel_id = ChannelId::from_proto(request.channel_id);
2876 session
2877 .db()
2878 .await
2879 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2880 .await?;
2881 Ok(())
2882}
2883
2884async fn get_channel_messages(
2885 request: proto::GetChannelMessages,
2886 response: Response<proto::GetChannelMessages>,
2887 session: Session,
2888) -> Result<()> {
2889 let channel_id = ChannelId::from_proto(request.channel_id);
2890 let messages = session
2891 .db()
2892 .await
2893 .get_channel_messages(
2894 channel_id,
2895 session.user_id,
2896 MESSAGE_COUNT_PER_PAGE,
2897 Some(MessageId::from_proto(request.before_message_id)),
2898 )
2899 .await?;
2900 response.send(proto::GetChannelMessagesResponse {
2901 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2902 messages,
2903 })?;
2904 Ok(())
2905}
2906
2907async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2908 let project_id = ProjectId::from_proto(request.project_id);
2909 let project_connection_ids = session
2910 .db()
2911 .await
2912 .project_connection_ids(project_id, session.connection_id)
2913 .await?;
2914 broadcast(
2915 Some(session.connection_id),
2916 project_connection_ids.iter().copied(),
2917 |connection_id| {
2918 session
2919 .peer
2920 .forward_send(session.connection_id, connection_id, request.clone())
2921 },
2922 );
2923 Ok(())
2924}
2925
2926async fn get_private_user_info(
2927 _request: proto::GetPrivateUserInfo,
2928 response: Response<proto::GetPrivateUserInfo>,
2929 session: Session,
2930) -> Result<()> {
2931 let db = session.db().await;
2932
2933 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2934 let user = db
2935 .get_user_by_id(session.user_id)
2936 .await?
2937 .ok_or_else(|| anyhow!("user not found"))?;
2938 let flags = db.get_user_flags(session.user_id).await?;
2939
2940 response.send(proto::GetPrivateUserInfoResponse {
2941 metrics_id,
2942 staff: user.admin,
2943 flags,
2944 })?;
2945 Ok(())
2946}
2947
2948fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2949 match message {
2950 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2951 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2952 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2953 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2954 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2955 code: frame.code.into(),
2956 reason: frame.reason,
2957 })),
2958 }
2959}
2960
2961fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2962 match message {
2963 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2964 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2965 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2966 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2967 AxumMessage::Close(frame) => {
2968 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2969 code: frame.code.into(),
2970 reason: frame.reason,
2971 }))
2972 }
2973 }
2974}
2975
2976fn build_initial_channels_update(
2977 channels: ChannelsForUser,
2978 channel_invites: Vec<db::Channel>,
2979) -> proto::UpdateChannels {
2980 let mut update = proto::UpdateChannels::default();
2981
2982 for channel in channels.channels.channels {
2983 update.channels.push(proto::Channel {
2984 id: channel.id.to_proto(),
2985 name: channel.name,
2986 });
2987 }
2988
2989 update.insert_edge = channels.channels.edges;
2990
2991 for (channel_id, participants) in channels.channel_participants {
2992 update
2993 .channel_participants
2994 .push(proto::ChannelParticipants {
2995 channel_id: channel_id.to_proto(),
2996 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2997 });
2998 }
2999
3000 update
3001 .channel_permissions
3002 .extend(
3003 channels
3004 .channels_with_admin_privileges
3005 .into_iter()
3006 .map(|id| proto::ChannelPermission {
3007 channel_id: id.to_proto(),
3008 is_admin: true,
3009 }),
3010 );
3011
3012 for channel in channel_invites {
3013 update.channel_invitations.push(proto::Channel {
3014 id: channel.id.to_proto(),
3015 name: channel.name,
3016 });
3017 }
3018
3019 update
3020}
3021
3022fn build_initial_contacts_update(
3023 contacts: Vec<db::Contact>,
3024 pool: &ConnectionPool,
3025) -> proto::UpdateContacts {
3026 let mut update = proto::UpdateContacts::default();
3027
3028 for contact in contacts {
3029 match contact {
3030 db::Contact::Accepted {
3031 user_id,
3032 should_notify,
3033 busy,
3034 } => {
3035 update
3036 .contacts
3037 .push(contact_for_user(user_id, should_notify, busy, &pool));
3038 }
3039 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
3040 db::Contact::Incoming {
3041 user_id,
3042 should_notify,
3043 } => update
3044 .incoming_requests
3045 .push(proto::IncomingContactRequest {
3046 requester_id: user_id.to_proto(),
3047 should_notify,
3048 }),
3049 }
3050 }
3051
3052 update
3053}
3054
3055fn contact_for_user(
3056 user_id: UserId,
3057 should_notify: bool,
3058 busy: bool,
3059 pool: &ConnectionPool,
3060) -> proto::Contact {
3061 proto::Contact {
3062 user_id: user_id.to_proto(),
3063 online: pool.is_user_online(user_id),
3064 busy,
3065 should_notify,
3066 }
3067}
3068
3069fn room_updated(room: &proto::Room, peer: &Peer) {
3070 broadcast(
3071 None,
3072 room.participants
3073 .iter()
3074 .filter_map(|participant| Some(participant.peer_id?.into())),
3075 |peer_id| {
3076 peer.send(
3077 peer_id.into(),
3078 proto::RoomUpdated {
3079 room: Some(room.clone()),
3080 },
3081 )
3082 },
3083 );
3084}
3085
3086fn channel_updated(
3087 channel_id: ChannelId,
3088 room: &proto::Room,
3089 channel_members: &[UserId],
3090 peer: &Peer,
3091 pool: &ConnectionPool,
3092) {
3093 let participants = room
3094 .participants
3095 .iter()
3096 .map(|p| p.user_id)
3097 .collect::<Vec<_>>();
3098
3099 broadcast(
3100 None,
3101 channel_members
3102 .iter()
3103 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3104 |peer_id| {
3105 peer.send(
3106 peer_id.into(),
3107 proto::UpdateChannels {
3108 channel_participants: vec![proto::ChannelParticipants {
3109 channel_id: channel_id.to_proto(),
3110 participant_user_ids: participants.clone(),
3111 }],
3112 ..Default::default()
3113 },
3114 )
3115 },
3116 );
3117}
3118
3119async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3120 let db = session.db().await;
3121
3122 let contacts = db.get_contacts(user_id).await?;
3123 let busy = db.is_user_busy(user_id).await?;
3124
3125 let pool = session.connection_pool().await;
3126 let updated_contact = contact_for_user(user_id, false, busy, &pool);
3127 for contact in contacts {
3128 if let db::Contact::Accepted {
3129 user_id: contact_user_id,
3130 ..
3131 } = contact
3132 {
3133 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3134 session
3135 .peer
3136 .send(
3137 contact_conn_id,
3138 proto::UpdateContacts {
3139 contacts: vec![updated_contact.clone()],
3140 remove_contacts: Default::default(),
3141 incoming_requests: Default::default(),
3142 remove_incoming_requests: Default::default(),
3143 outgoing_requests: Default::default(),
3144 remove_outgoing_requests: Default::default(),
3145 },
3146 )
3147 .trace_err();
3148 }
3149 }
3150 }
3151 Ok(())
3152}
3153
3154async fn leave_room_for_session(session: &Session) -> Result<()> {
3155 let mut contacts_to_update = HashSet::default();
3156
3157 let room_id;
3158 let canceled_calls_to_user_ids;
3159 let live_kit_room;
3160 let delete_live_kit_room;
3161 let room;
3162 let channel_members;
3163 let channel_id;
3164
3165 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3166 contacts_to_update.insert(session.user_id);
3167
3168 for project in left_room.left_projects.values() {
3169 project_left(project, session);
3170 }
3171
3172 room_id = RoomId::from_proto(left_room.room.id);
3173 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3174 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3175 delete_live_kit_room = left_room.deleted;
3176 room = mem::take(&mut left_room.room);
3177 channel_members = mem::take(&mut left_room.channel_members);
3178 channel_id = left_room.channel_id;
3179
3180 room_updated(&room, &session.peer);
3181 } else {
3182 return Ok(());
3183 }
3184
3185 if let Some(channel_id) = channel_id {
3186 channel_updated(
3187 channel_id,
3188 &room,
3189 &channel_members,
3190 &session.peer,
3191 &*session.connection_pool().await,
3192 );
3193 }
3194
3195 {
3196 let pool = session.connection_pool().await;
3197 for canceled_user_id in canceled_calls_to_user_ids {
3198 for connection_id in pool.user_connection_ids(canceled_user_id) {
3199 session
3200 .peer
3201 .send(
3202 connection_id,
3203 proto::CallCanceled {
3204 room_id: room_id.to_proto(),
3205 },
3206 )
3207 .trace_err();
3208 }
3209 contacts_to_update.insert(canceled_user_id);
3210 }
3211 }
3212
3213 for contact_user_id in contacts_to_update {
3214 update_user_contacts(contact_user_id, &session).await?;
3215 }
3216
3217 if let Some(live_kit) = session.live_kit_client.as_ref() {
3218 live_kit
3219 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3220 .await
3221 .trace_err();
3222
3223 if delete_live_kit_room {
3224 live_kit.delete_room(live_kit_room).await.trace_err();
3225 }
3226 }
3227
3228 Ok(())
3229}
3230
3231async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3232 let left_channel_buffers = session
3233 .db()
3234 .await
3235 .leave_channel_buffers(session.connection_id)
3236 .await?;
3237
3238 for (channel_id, connections) in left_channel_buffers {
3239 channel_buffer_updated(
3240 session.connection_id,
3241 connections,
3242 &proto::RemoveChannelBufferCollaborator {
3243 channel_id: channel_id.to_proto(),
3244 peer_id: Some(session.connection_id.into()),
3245 },
3246 &session.peer,
3247 );
3248 }
3249
3250 Ok(())
3251}
3252
3253fn project_left(project: &db::LeftProject, session: &Session) {
3254 for connection_id in &project.connection_ids {
3255 if project.host_user_id == session.user_id {
3256 session
3257 .peer
3258 .send(
3259 *connection_id,
3260 proto::UnshareProject {
3261 project_id: project.id.to_proto(),
3262 },
3263 )
3264 .trace_err();
3265 } else {
3266 session
3267 .peer
3268 .send(
3269 *connection_id,
3270 proto::RemoveProjectCollaborator {
3271 project_id: project.id.to_proto(),
3272 peer_id: Some(session.connection_id.into()),
3273 },
3274 )
3275 .trace_err();
3276 }
3277 }
3278}
3279
3280pub trait ResultExt {
3281 type Ok;
3282
3283 fn trace_err(self) -> Option<Self::Ok>;
3284}
3285
3286impl<T, E> ResultExt for Result<T, E>
3287where
3288 E: std::fmt::Debug,
3289{
3290 type Ok = T;
3291
3292 fn trace_err(self) -> Option<T> {
3293 match self {
3294 Ok(value) => Some(value),
3295 Err(error) => {
3296 tracing::error!("{:?}", error);
3297 None
3298 }
3299 }
3300 }
3301}