1mod connection_pool;
2
3use crate::{
4 auth,
5 db::{
6 self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
7 UserId,
8 },
9 executor::Executor,
10 AppState, Result,
11};
12use anyhow::anyhow;
13use async_tungstenite::tungstenite::{
14 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
15};
16use axum::{
17 body::Body,
18 extract::{
19 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
20 ConnectInfo, WebSocketUpgrade,
21 },
22 headers::{Header, HeaderName},
23 http::StatusCode,
24 middleware,
25 response::IntoResponse,
26 routing::get,
27 Extension, Router, TypedHeader,
28};
29use collections::{HashMap, HashSet};
30pub use connection_pool::ConnectionPool;
31use futures::{
32 channel::oneshot,
33 future::{self, BoxFuture},
34 stream::FuturesUnordered,
35 FutureExt, SinkExt, StreamExt, TryStreamExt,
36};
37use lazy_static::lazy_static;
38use prometheus::{register_int_gauge, IntGauge};
39use rpc::{
40 proto::{
41 self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
42 LiveKitConnectionInfo, RequestMessage,
43 },
44 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
45};
46use serde::{Serialize, Serializer};
47use std::{
48 any::TypeId,
49 fmt,
50 future::Future,
51 marker::PhantomData,
52 mem,
53 net::SocketAddr,
54 ops::{Deref, DerefMut},
55 rc::Rc,
56 sync::{
57 atomic::{AtomicBool, Ordering::SeqCst},
58 Arc,
59 },
60 time::{Duration, Instant},
61};
62use time::OffsetDateTime;
63use tokio::sync::{watch, Semaphore};
64use tower::ServiceBuilder;
65use tracing::{info_span, instrument, Instrument};
66
67pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
68pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
69
70const MESSAGE_COUNT_PER_PAGE: usize = 100;
71const MAX_MESSAGE_LEN: usize = 1024;
72
73lazy_static! {
74 static ref METRIC_CONNECTIONS: IntGauge =
75 register_int_gauge!("connections", "number of connections").unwrap();
76 static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!(
77 "shared_projects",
78 "number of open projects with one or more guests"
79 )
80 .unwrap();
81}
82
83type MessageHandler =
84 Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
85
86struct Response<R> {
87 peer: Arc<Peer>,
88 receipt: Receipt<R>,
89 responded: Arc<AtomicBool>,
90}
91
92impl<R: RequestMessage> Response<R> {
93 fn send(self, payload: R::Response) -> Result<()> {
94 self.responded.store(true, SeqCst);
95 self.peer.respond(self.receipt, payload)?;
96 Ok(())
97 }
98}
99
100#[derive(Clone)]
101struct Session {
102 user_id: UserId,
103 connection_id: ConnectionId,
104 db: Arc<tokio::sync::Mutex<DbHandle>>,
105 peer: Arc<Peer>,
106 connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
107 live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
108 executor: Executor,
109}
110
111impl Session {
112 async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
113 #[cfg(test)]
114 tokio::task::yield_now().await;
115 let guard = self.db.lock().await;
116 #[cfg(test)]
117 tokio::task::yield_now().await;
118 guard
119 }
120
121 async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
122 #[cfg(test)]
123 tokio::task::yield_now().await;
124 let guard = self.connection_pool.lock();
125 ConnectionPoolGuard {
126 guard,
127 _not_send: PhantomData,
128 }
129 }
130}
131
132impl fmt::Debug for Session {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("Session")
135 .field("user_id", &self.user_id)
136 .field("connection_id", &self.connection_id)
137 .finish()
138 }
139}
140
141struct DbHandle(Arc<Database>);
142
143impl Deref for DbHandle {
144 type Target = Database;
145
146 fn deref(&self) -> &Self::Target {
147 self.0.as_ref()
148 }
149}
150
151pub struct Server {
152 id: parking_lot::Mutex<ServerId>,
153 peer: Arc<Peer>,
154 pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
155 app_state: Arc<AppState>,
156 executor: Executor,
157 handlers: HashMap<TypeId, MessageHandler>,
158 teardown: watch::Sender<()>,
159}
160
161pub(crate) struct ConnectionPoolGuard<'a> {
162 guard: parking_lot::MutexGuard<'a, ConnectionPool>,
163 _not_send: PhantomData<Rc<()>>,
164}
165
166#[derive(Serialize)]
167pub struct ServerSnapshot<'a> {
168 peer: &'a Peer,
169 #[serde(serialize_with = "serialize_deref")]
170 connection_pool: ConnectionPoolGuard<'a>,
171}
172
173pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
174where
175 S: Serializer,
176 T: Deref<Target = U>,
177 U: Serialize,
178{
179 Serialize::serialize(value.deref(), serializer)
180}
181
182impl Server {
183 pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
184 let mut server = Self {
185 id: parking_lot::Mutex::new(id),
186 peer: Peer::new(id.0 as u32),
187 app_state,
188 executor,
189 connection_pool: Default::default(),
190 handlers: Default::default(),
191 teardown: watch::channel(()).0,
192 };
193
194 server
195 .add_request_handler(ping)
196 .add_request_handler(create_room)
197 .add_request_handler(join_room)
198 .add_request_handler(rejoin_room)
199 .add_request_handler(leave_room)
200 .add_request_handler(call)
201 .add_request_handler(cancel_call)
202 .add_message_handler(decline_call)
203 .add_request_handler(update_participant_location)
204 .add_request_handler(share_project)
205 .add_message_handler(unshare_project)
206 .add_request_handler(join_project)
207 .add_message_handler(leave_project)
208 .add_request_handler(update_project)
209 .add_request_handler(update_worktree)
210 .add_message_handler(start_language_server)
211 .add_message_handler(update_language_server)
212 .add_message_handler(update_diagnostic_summary)
213 .add_message_handler(update_worktree_settings)
214 .add_message_handler(refresh_inlay_hints)
215 .add_request_handler(forward_project_request::<proto::GetHover>)
216 .add_request_handler(forward_project_request::<proto::GetDefinition>)
217 .add_request_handler(forward_project_request::<proto::GetTypeDefinition>)
218 .add_request_handler(forward_project_request::<proto::GetReferences>)
219 .add_request_handler(forward_project_request::<proto::SearchProject>)
220 .add_request_handler(forward_project_request::<proto::GetDocumentHighlights>)
221 .add_request_handler(forward_project_request::<proto::GetProjectSymbols>)
222 .add_request_handler(forward_project_request::<proto::OpenBufferForSymbol>)
223 .add_request_handler(forward_project_request::<proto::OpenBufferById>)
224 .add_request_handler(forward_project_request::<proto::OpenBufferByPath>)
225 .add_request_handler(forward_project_request::<proto::GetCompletions>)
226 .add_request_handler(forward_project_request::<proto::ApplyCompletionAdditionalEdits>)
227 .add_request_handler(forward_project_request::<proto::GetCodeActions>)
228 .add_request_handler(forward_project_request::<proto::ApplyCodeAction>)
229 .add_request_handler(forward_project_request::<proto::PrepareRename>)
230 .add_request_handler(forward_project_request::<proto::PerformRename>)
231 .add_request_handler(forward_project_request::<proto::ReloadBuffers>)
232 .add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
233 .add_request_handler(forward_project_request::<proto::FormatBuffers>)
234 .add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
235 .add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
236 .add_request_handler(forward_project_request::<proto::CopyProjectEntry>)
237 .add_request_handler(forward_project_request::<proto::DeleteProjectEntry>)
238 .add_request_handler(forward_project_request::<proto::ExpandProjectEntry>)
239 .add_request_handler(forward_project_request::<proto::OnTypeFormatting>)
240 .add_request_handler(forward_project_request::<proto::InlayHints>)
241 .add_message_handler(create_buffer_for_peer)
242 .add_request_handler(update_buffer)
243 .add_message_handler(update_buffer_file)
244 .add_message_handler(buffer_reloaded)
245 .add_message_handler(buffer_saved)
246 .add_request_handler(forward_project_request::<proto::SaveBuffer>)
247 .add_request_handler(get_users)
248 .add_request_handler(fuzzy_search_users)
249 .add_request_handler(request_contact)
250 .add_request_handler(remove_contact)
251 .add_request_handler(respond_to_contact_request)
252 .add_request_handler(create_channel)
253 .add_request_handler(delete_channel)
254 .add_request_handler(invite_channel_member)
255 .add_request_handler(remove_channel_member)
256 .add_request_handler(set_channel_member_admin)
257 .add_request_handler(rename_channel)
258 .add_request_handler(join_channel_buffer)
259 .add_request_handler(leave_channel_buffer)
260 .add_message_handler(update_channel_buffer)
261 .add_request_handler(rejoin_channel_buffers)
262 .add_request_handler(get_channel_members)
263 .add_request_handler(respond_to_channel_invite)
264 .add_request_handler(join_channel)
265 .add_request_handler(join_channel_chat)
266 .add_message_handler(leave_channel_chat)
267 .add_request_handler(send_channel_message)
268 .add_request_handler(remove_channel_message)
269 .add_request_handler(get_channel_messages)
270 .add_request_handler(move_channel)
271 .add_request_handler(follow)
272 .add_message_handler(unfollow)
273 .add_message_handler(update_followers)
274 .add_message_handler(update_diff_base)
275 .add_request_handler(get_private_user_info);
276
277 Arc::new(server)
278 }
279
280 pub async fn start(&self) -> Result<()> {
281 let server_id = *self.id.lock();
282 let app_state = self.app_state.clone();
283 let peer = self.peer.clone();
284 let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
285 let pool = self.connection_pool.clone();
286 let live_kit_client = self.app_state.live_kit_client.clone();
287
288 let span = info_span!("start server");
289 self.executor.spawn_detached(
290 async move {
291 tracing::info!("waiting for cleanup timeout");
292 timeout.await;
293 tracing::info!("cleanup timeout expired, retrieving stale rooms");
294 if let Some((room_ids, channel_ids)) = app_state
295 .db
296 .stale_server_resource_ids(&app_state.config.zed_environment, server_id)
297 .await
298 .trace_err()
299 {
300 tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
301 tracing::info!(
302 stale_channel_buffer_count = channel_ids.len(),
303 "retrieved stale channel buffers"
304 );
305
306 for channel_id in channel_ids {
307 if let Some(refreshed_channel_buffer) = app_state
308 .db
309 .clear_stale_channel_buffer_collaborators(channel_id, server_id)
310 .await
311 .trace_err()
312 {
313 for connection_id in refreshed_channel_buffer.connection_ids {
314 for message in &refreshed_channel_buffer.removed_collaborators {
315 peer.send(connection_id, message.clone()).trace_err();
316 }
317 }
318 }
319 }
320
321 for room_id in room_ids {
322 let mut contacts_to_update = HashSet::default();
323 let mut canceled_calls_to_user_ids = Vec::new();
324 let mut live_kit_room = String::new();
325 let mut delete_live_kit_room = false;
326
327 if let Some(mut refreshed_room) = app_state
328 .db
329 .clear_stale_room_participants(room_id, server_id)
330 .await
331 .trace_err()
332 {
333 tracing::info!(
334 room_id = room_id.0,
335 new_participant_count = refreshed_room.room.participants.len(),
336 "refreshed room"
337 );
338 room_updated(&refreshed_room.room, &peer);
339 if let Some(channel_id) = refreshed_room.channel_id {
340 channel_updated(
341 channel_id,
342 &refreshed_room.room,
343 &refreshed_room.channel_members,
344 &peer,
345 &*pool.lock(),
346 );
347 }
348 contacts_to_update
349 .extend(refreshed_room.stale_participant_user_ids.iter().copied());
350 contacts_to_update
351 .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
352 canceled_calls_to_user_ids =
353 mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
354 live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
355 delete_live_kit_room = refreshed_room.room.participants.is_empty();
356 }
357
358 {
359 let pool = pool.lock();
360 for canceled_user_id in canceled_calls_to_user_ids {
361 for connection_id in pool.user_connection_ids(canceled_user_id) {
362 peer.send(
363 connection_id,
364 proto::CallCanceled {
365 room_id: room_id.to_proto(),
366 },
367 )
368 .trace_err();
369 }
370 }
371 }
372
373 for user_id in contacts_to_update {
374 let busy = app_state.db.is_user_busy(user_id).await.trace_err();
375 let contacts = app_state.db.get_contacts(user_id).await.trace_err();
376 if let Some((busy, contacts)) = busy.zip(contacts) {
377 let pool = pool.lock();
378 let updated_contact = contact_for_user(user_id, false, busy, &pool);
379 for contact in contacts {
380 if let db::Contact::Accepted {
381 user_id: contact_user_id,
382 ..
383 } = contact
384 {
385 for contact_conn_id in
386 pool.user_connection_ids(contact_user_id)
387 {
388 peer.send(
389 contact_conn_id,
390 proto::UpdateContacts {
391 contacts: vec![updated_contact.clone()],
392 remove_contacts: Default::default(),
393 incoming_requests: Default::default(),
394 remove_incoming_requests: Default::default(),
395 outgoing_requests: Default::default(),
396 remove_outgoing_requests: Default::default(),
397 },
398 )
399 .trace_err();
400 }
401 }
402 }
403 }
404 }
405
406 if let Some(live_kit) = live_kit_client.as_ref() {
407 if delete_live_kit_room {
408 live_kit.delete_room(live_kit_room).await.trace_err();
409 }
410 }
411 }
412 }
413
414 app_state
415 .db
416 .delete_stale_servers(&app_state.config.zed_environment, server_id)
417 .await
418 .trace_err();
419 }
420 .instrument(span),
421 );
422 Ok(())
423 }
424
425 pub fn teardown(&self) {
426 self.peer.teardown();
427 self.connection_pool.lock().reset();
428 let _ = self.teardown.send(());
429 }
430
431 #[cfg(test)]
432 pub fn reset(&self, id: ServerId) {
433 self.teardown();
434 *self.id.lock() = id;
435 self.peer.reset(id.0 as u32);
436 }
437
438 #[cfg(test)]
439 pub fn id(&self) -> ServerId {
440 *self.id.lock()
441 }
442
443 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
444 where
445 F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
446 Fut: 'static + Send + Future<Output = Result<()>>,
447 M: EnvelopedMessage,
448 {
449 let prev_handler = self.handlers.insert(
450 TypeId::of::<M>(),
451 Box::new(move |envelope, session| {
452 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
453 let span = info_span!(
454 "handle message",
455 payload_type = envelope.payload_type_name()
456 );
457 span.in_scope(|| {
458 tracing::info!(
459 payload_type = envelope.payload_type_name(),
460 "message received"
461 );
462 });
463 let start_time = Instant::now();
464 let future = (handler)(*envelope, session);
465 async move {
466 let result = future.await;
467 let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
468 match result {
469 Err(error) => {
470 tracing::error!(%error, ?duration_ms, "error handling message")
471 }
472 Ok(()) => tracing::info!(?duration_ms, "finished handling message"),
473 }
474 }
475 .instrument(span)
476 .boxed()
477 }),
478 );
479 if prev_handler.is_some() {
480 panic!("registered a handler for the same message twice");
481 }
482 self
483 }
484
485 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
486 where
487 F: 'static + Send + Sync + Fn(M, Session) -> Fut,
488 Fut: 'static + Send + Future<Output = Result<()>>,
489 M: EnvelopedMessage,
490 {
491 self.add_handler(move |envelope, session| handler(envelope.payload, session));
492 self
493 }
494
495 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
496 where
497 F: 'static + Send + Sync + Fn(M, Response<M>, Session) -> Fut,
498 Fut: Send + Future<Output = Result<()>>,
499 M: RequestMessage,
500 {
501 let handler = Arc::new(handler);
502 self.add_handler(move |envelope, session| {
503 let receipt = envelope.receipt();
504 let handler = handler.clone();
505 async move {
506 let peer = session.peer.clone();
507 let responded = Arc::new(AtomicBool::default());
508 let response = Response {
509 peer: peer.clone(),
510 responded: responded.clone(),
511 receipt,
512 };
513 match (handler)(envelope.payload, response, session).await {
514 Ok(()) => {
515 if responded.load(std::sync::atomic::Ordering::SeqCst) {
516 Ok(())
517 } else {
518 Err(anyhow!("handler did not send a response"))?
519 }
520 }
521 Err(error) => {
522 peer.respond_with_error(
523 receipt,
524 proto::Error {
525 message: error.to_string(),
526 },
527 )?;
528 Err(error)
529 }
530 }
531 }
532 })
533 }
534
535 pub fn handle_connection(
536 self: &Arc<Self>,
537 connection: Connection,
538 address: String,
539 user: User,
540 mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
541 executor: Executor,
542 ) -> impl Future<Output = Result<()>> {
543 let this = self.clone();
544 let user_id = user.id;
545 let login = user.github_login;
546 let span = info_span!("handle connection", %user_id, %login, %address);
547 let mut teardown = self.teardown.subscribe();
548 async move {
549 let (connection_id, handle_io, mut incoming_rx) = this
550 .peer
551 .add_connection(connection, {
552 let executor = executor.clone();
553 move |duration| executor.sleep(duration)
554 });
555
556 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
557 this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
558 tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
559
560 if let Some(send_connection_id) = send_connection_id.take() {
561 let _ = send_connection_id.send(connection_id);
562 }
563
564 if !user.connected_once {
565 this.peer.send(connection_id, proto::ShowContacts {})?;
566 this.app_state.db.set_user_connected_once(user_id, true).await?;
567 }
568
569 let (contacts, channels_for_user, channel_invites) = future::try_join3(
570 this.app_state.db.get_contacts(user_id),
571 this.app_state.db.get_channels_for_user(user_id),
572 this.app_state.db.get_channel_invites_for_user(user_id)
573 ).await?;
574
575 {
576 let mut pool = this.connection_pool.lock();
577 pool.add_connection(connection_id, user_id, user.admin);
578 this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
579 this.peer.send(connection_id, build_initial_channels_update(
580 channels_for_user,
581 channel_invites
582 ))?;
583 }
584
585 if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
586 this.peer.send(connection_id, incoming_call)?;
587 }
588
589 let session = Session {
590 user_id,
591 connection_id,
592 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
593 peer: this.peer.clone(),
594 connection_pool: this.connection_pool.clone(),
595 live_kit_client: this.app_state.live_kit_client.clone(),
596 executor: executor.clone(),
597 };
598 update_user_contacts(user_id, &session).await?;
599
600 let handle_io = handle_io.fuse();
601 futures::pin_mut!(handle_io);
602
603 // Handlers for foreground messages are pushed into the following `FuturesUnordered`.
604 // This prevents deadlocks when e.g., client A performs a request to client B and
605 // client B performs a request to client A. If both clients stop processing further
606 // messages until their respective request completes, they won't have a chance to
607 // respond to the other client's request and cause a deadlock.
608 //
609 // This arrangement ensures we will attempt to process earlier messages first, but fall
610 // back to processing messages arrived later in the spirit of making progress.
611 let mut foreground_message_handlers = FuturesUnordered::new();
612 let concurrent_handlers = Arc::new(Semaphore::new(256));
613 loop {
614 let next_message = async {
615 let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
616 let message = incoming_rx.next().await;
617 (permit, message)
618 }.fuse();
619 futures::pin_mut!(next_message);
620 futures::select_biased! {
621 _ = teardown.changed().fuse() => return Ok(()),
622 result = handle_io => {
623 if let Err(error) = result {
624 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
625 }
626 break;
627 }
628 _ = foreground_message_handlers.next() => {}
629 next_message = next_message => {
630 let (permit, message) = next_message;
631 if let Some(message) = message {
632 let type_name = message.payload_type_name();
633 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
634 let span_enter = span.enter();
635 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
636 let is_background = message.is_background();
637 let handle_message = (handler)(message, session.clone());
638 drop(span_enter);
639
640 let handle_message = async move {
641 handle_message.await;
642 drop(permit);
643 }.instrument(span);
644 if is_background {
645 executor.spawn_detached(handle_message);
646 } else {
647 foreground_message_handlers.push(handle_message);
648 }
649 } else {
650 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
651 }
652 } else {
653 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
654 break;
655 }
656 }
657 }
658 }
659
660 drop(foreground_message_handlers);
661 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
662 if let Err(error) = connection_lost(session, teardown, executor).await {
663 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
664 }
665
666 Ok(())
667 }.instrument(span)
668 }
669
670 pub async fn invite_code_redeemed(
671 self: &Arc<Self>,
672 inviter_id: UserId,
673 invitee_id: UserId,
674 ) -> Result<()> {
675 if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
676 if let Some(code) = &user.invite_code {
677 let pool = self.connection_pool.lock();
678 let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
679 for connection_id in pool.user_connection_ids(inviter_id) {
680 self.peer.send(
681 connection_id,
682 proto::UpdateContacts {
683 contacts: vec![invitee_contact.clone()],
684 ..Default::default()
685 },
686 )?;
687 self.peer.send(
688 connection_id,
689 proto::UpdateInviteInfo {
690 url: format!("{}{}", self.app_state.config.invite_link_prefix, &code),
691 count: user.invite_count as u32,
692 },
693 )?;
694 }
695 }
696 }
697 Ok(())
698 }
699
700 pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
701 if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
702 if let Some(invite_code) = &user.invite_code {
703 let pool = self.connection_pool.lock();
704 for connection_id in pool.user_connection_ids(user_id) {
705 self.peer.send(
706 connection_id,
707 proto::UpdateInviteInfo {
708 url: format!(
709 "{}{}",
710 self.app_state.config.invite_link_prefix, invite_code
711 ),
712 count: user.invite_count as u32,
713 },
714 )?;
715 }
716 }
717 }
718 Ok(())
719 }
720
721 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
722 ServerSnapshot {
723 connection_pool: ConnectionPoolGuard {
724 guard: self.connection_pool.lock(),
725 _not_send: PhantomData,
726 },
727 peer: &self.peer,
728 }
729 }
730}
731
732impl<'a> Deref for ConnectionPoolGuard<'a> {
733 type Target = ConnectionPool;
734
735 fn deref(&self) -> &Self::Target {
736 &*self.guard
737 }
738}
739
740impl<'a> DerefMut for ConnectionPoolGuard<'a> {
741 fn deref_mut(&mut self) -> &mut Self::Target {
742 &mut *self.guard
743 }
744}
745
746impl<'a> Drop for ConnectionPoolGuard<'a> {
747 fn drop(&mut self) {
748 #[cfg(test)]
749 self.check_invariants();
750 }
751}
752
753fn broadcast<F>(
754 sender_id: Option<ConnectionId>,
755 receiver_ids: impl IntoIterator<Item = ConnectionId>,
756 mut f: F,
757) where
758 F: FnMut(ConnectionId) -> anyhow::Result<()>,
759{
760 for receiver_id in receiver_ids {
761 if Some(receiver_id) != sender_id {
762 if let Err(error) = f(receiver_id) {
763 tracing::error!("failed to send to {:?} {}", receiver_id, error);
764 }
765 }
766 }
767}
768
769lazy_static! {
770 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
771}
772
773pub struct ProtocolVersion(u32);
774
775impl Header for ProtocolVersion {
776 fn name() -> &'static HeaderName {
777 &ZED_PROTOCOL_VERSION
778 }
779
780 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
781 where
782 Self: Sized,
783 I: Iterator<Item = &'i axum::http::HeaderValue>,
784 {
785 let version = values
786 .next()
787 .ok_or_else(axum::headers::Error::invalid)?
788 .to_str()
789 .map_err(|_| axum::headers::Error::invalid())?
790 .parse()
791 .map_err(|_| axum::headers::Error::invalid())?;
792 Ok(Self(version))
793 }
794
795 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
796 values.extend([self.0.to_string().parse().unwrap()]);
797 }
798}
799
800pub fn routes(server: Arc<Server>) -> Router<Body> {
801 Router::new()
802 .route("/rpc", get(handle_websocket_request))
803 .layer(
804 ServiceBuilder::new()
805 .layer(Extension(server.app_state.clone()))
806 .layer(middleware::from_fn(auth::validate_header)),
807 )
808 .route("/metrics", get(handle_metrics))
809 .layer(Extension(server))
810}
811
812pub async fn handle_websocket_request(
813 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
814 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
815 Extension(server): Extension<Arc<Server>>,
816 Extension(user): Extension<User>,
817 ws: WebSocketUpgrade,
818) -> axum::response::Response {
819 if protocol_version != rpc::PROTOCOL_VERSION {
820 return (
821 StatusCode::UPGRADE_REQUIRED,
822 "client must be upgraded".to_string(),
823 )
824 .into_response();
825 }
826 let socket_address = socket_address.to_string();
827 ws.on_upgrade(move |socket| {
828 use util::ResultExt;
829 let socket = socket
830 .map_ok(to_tungstenite_message)
831 .err_into()
832 .with(|message| async move { Ok(to_axum_message(message)) });
833 let connection = Connection::new(Box::pin(socket));
834 async move {
835 server
836 .handle_connection(connection, socket_address, user, None, Executor::Production)
837 .await
838 .log_err();
839 }
840 })
841}
842
843pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result<String> {
844 let connections = server
845 .connection_pool
846 .lock()
847 .connections()
848 .filter(|connection| !connection.admin)
849 .count();
850
851 METRIC_CONNECTIONS.set(connections as _);
852
853 let shared_projects = server.app_state.db.project_count_excluding_admins().await?;
854 METRIC_SHARED_PROJECTS.set(shared_projects as _);
855
856 let encoder = prometheus::TextEncoder::new();
857 let metric_families = prometheus::gather();
858 let encoded_metrics = encoder
859 .encode_to_string(&metric_families)
860 .map_err(|err| anyhow!("{}", err))?;
861 Ok(encoded_metrics)
862}
863
864#[instrument(err, skip(executor))]
865async fn connection_lost(
866 session: Session,
867 mut teardown: watch::Receiver<()>,
868 executor: Executor,
869) -> Result<()> {
870 session.peer.disconnect(session.connection_id);
871 session
872 .connection_pool()
873 .await
874 .remove_connection(session.connection_id)?;
875
876 session
877 .db()
878 .await
879 .connection_lost(session.connection_id)
880 .await
881 .trace_err();
882
883 futures::select_biased! {
884 _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
885 log::info!("connection lost, removing all resources for user:{}, connection:{:?}", session.user_id, session.connection_id);
886 leave_room_for_session(&session).await.trace_err();
887 leave_channel_buffers_for_session(&session)
888 .await
889 .trace_err();
890
891 if !session
892 .connection_pool()
893 .await
894 .is_user_online(session.user_id)
895 {
896 let db = session.db().await;
897 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
898 room_updated(&room, &session.peer);
899 }
900 }
901
902 update_user_contacts(session.user_id, &session).await?;
903 }
904 _ = teardown.changed().fuse() => {}
905 }
906
907 Ok(())
908}
909
910async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session) -> Result<()> {
911 response.send(proto::Ack {})?;
912 Ok(())
913}
914
915async fn create_room(
916 _request: proto::CreateRoom,
917 response: Response<proto::CreateRoom>,
918 session: Session,
919) -> Result<()> {
920 let live_kit_room = nanoid::nanoid!(30);
921
922 let live_kit_connection_info = {
923 let live_kit_room = live_kit_room.clone();
924 let live_kit = session.live_kit_client.as_ref();
925
926 util::async_iife!({
927 let live_kit = live_kit?;
928
929 live_kit
930 .create_room(live_kit_room.clone())
931 .await
932 .trace_err()?;
933
934 let token = live_kit
935 .room_token(&live_kit_room, &session.user_id.to_string())
936 .trace_err()?;
937
938 Some(proto::LiveKitConnectionInfo {
939 server_url: live_kit.url().into(),
940 token,
941 })
942 })
943 }
944 .await;
945
946 let room = session
947 .db()
948 .await
949 .create_room(session.user_id, session.connection_id, &live_kit_room)
950 .await?;
951
952 response.send(proto::CreateRoomResponse {
953 room: Some(room.clone()),
954 live_kit_connection_info,
955 })?;
956
957 update_user_contacts(session.user_id, &session).await?;
958 Ok(())
959}
960
961async fn join_room(
962 request: proto::JoinRoom,
963 response: Response<proto::JoinRoom>,
964 session: Session,
965) -> Result<()> {
966 let room_id = RoomId::from_proto(request.id);
967 let joined_room = {
968 let room = session
969 .db()
970 .await
971 .join_room(room_id, session.user_id, session.connection_id)
972 .await?;
973 room_updated(&room.room, &session.peer);
974 room.into_inner()
975 };
976
977 if let Some(channel_id) = joined_room.channel_id {
978 channel_updated(
979 channel_id,
980 &joined_room.room,
981 &joined_room.channel_members,
982 &session.peer,
983 &*session.connection_pool().await,
984 )
985 }
986
987 for connection_id in session
988 .connection_pool()
989 .await
990 .user_connection_ids(session.user_id)
991 {
992 session
993 .peer
994 .send(
995 connection_id,
996 proto::CallCanceled {
997 room_id: room_id.to_proto(),
998 },
999 )
1000 .trace_err();
1001 }
1002
1003 let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
1004 if let Some(token) = live_kit
1005 .room_token(
1006 &joined_room.room.live_kit_room,
1007 &session.user_id.to_string(),
1008 )
1009 .trace_err()
1010 {
1011 Some(proto::LiveKitConnectionInfo {
1012 server_url: live_kit.url().into(),
1013 token,
1014 })
1015 } else {
1016 None
1017 }
1018 } else {
1019 None
1020 };
1021
1022 response.send(proto::JoinRoomResponse {
1023 room: Some(joined_room.room),
1024 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
1025 live_kit_connection_info,
1026 })?;
1027
1028 update_user_contacts(session.user_id, &session).await?;
1029 Ok(())
1030}
1031
1032async fn rejoin_room(
1033 request: proto::RejoinRoom,
1034 response: Response<proto::RejoinRoom>,
1035 session: Session,
1036) -> Result<()> {
1037 let room;
1038 let channel_id;
1039 let channel_members;
1040 {
1041 let mut rejoined_room = session
1042 .db()
1043 .await
1044 .rejoin_room(request, session.user_id, session.connection_id)
1045 .await?;
1046
1047 response.send(proto::RejoinRoomResponse {
1048 room: Some(rejoined_room.room.clone()),
1049 reshared_projects: rejoined_room
1050 .reshared_projects
1051 .iter()
1052 .map(|project| proto::ResharedProject {
1053 id: project.id.to_proto(),
1054 collaborators: project
1055 .collaborators
1056 .iter()
1057 .map(|collaborator| collaborator.to_proto())
1058 .collect(),
1059 })
1060 .collect(),
1061 rejoined_projects: rejoined_room
1062 .rejoined_projects
1063 .iter()
1064 .map(|rejoined_project| proto::RejoinedProject {
1065 id: rejoined_project.id.to_proto(),
1066 worktrees: rejoined_project
1067 .worktrees
1068 .iter()
1069 .map(|worktree| proto::WorktreeMetadata {
1070 id: worktree.id,
1071 root_name: worktree.root_name.clone(),
1072 visible: worktree.visible,
1073 abs_path: worktree.abs_path.clone(),
1074 })
1075 .collect(),
1076 collaborators: rejoined_project
1077 .collaborators
1078 .iter()
1079 .map(|collaborator| collaborator.to_proto())
1080 .collect(),
1081 language_servers: rejoined_project.language_servers.clone(),
1082 })
1083 .collect(),
1084 })?;
1085 room_updated(&rejoined_room.room, &session.peer);
1086
1087 for project in &rejoined_room.reshared_projects {
1088 for collaborator in &project.collaborators {
1089 session
1090 .peer
1091 .send(
1092 collaborator.connection_id,
1093 proto::UpdateProjectCollaborator {
1094 project_id: project.id.to_proto(),
1095 old_peer_id: Some(project.old_connection_id.into()),
1096 new_peer_id: Some(session.connection_id.into()),
1097 },
1098 )
1099 .trace_err();
1100 }
1101
1102 broadcast(
1103 Some(session.connection_id),
1104 project
1105 .collaborators
1106 .iter()
1107 .map(|collaborator| collaborator.connection_id),
1108 |connection_id| {
1109 session.peer.forward_send(
1110 session.connection_id,
1111 connection_id,
1112 proto::UpdateProject {
1113 project_id: project.id.to_proto(),
1114 worktrees: project.worktrees.clone(),
1115 },
1116 )
1117 },
1118 );
1119 }
1120
1121 for project in &rejoined_room.rejoined_projects {
1122 for collaborator in &project.collaborators {
1123 session
1124 .peer
1125 .send(
1126 collaborator.connection_id,
1127 proto::UpdateProjectCollaborator {
1128 project_id: project.id.to_proto(),
1129 old_peer_id: Some(project.old_connection_id.into()),
1130 new_peer_id: Some(session.connection_id.into()),
1131 },
1132 )
1133 .trace_err();
1134 }
1135 }
1136
1137 for project in &mut rejoined_room.rejoined_projects {
1138 for worktree in mem::take(&mut project.worktrees) {
1139 #[cfg(any(test, feature = "test-support"))]
1140 const MAX_CHUNK_SIZE: usize = 2;
1141 #[cfg(not(any(test, feature = "test-support")))]
1142 const MAX_CHUNK_SIZE: usize = 256;
1143
1144 // Stream this worktree's entries.
1145 let message = proto::UpdateWorktree {
1146 project_id: project.id.to_proto(),
1147 worktree_id: worktree.id,
1148 abs_path: worktree.abs_path.clone(),
1149 root_name: worktree.root_name,
1150 updated_entries: worktree.updated_entries,
1151 removed_entries: worktree.removed_entries,
1152 scan_id: worktree.scan_id,
1153 is_last_update: worktree.completed_scan_id == worktree.scan_id,
1154 updated_repositories: worktree.updated_repositories,
1155 removed_repositories: worktree.removed_repositories,
1156 };
1157 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1158 session.peer.send(session.connection_id, update.clone())?;
1159 }
1160
1161 // Stream this worktree's diagnostics.
1162 for summary in worktree.diagnostic_summaries {
1163 session.peer.send(
1164 session.connection_id,
1165 proto::UpdateDiagnosticSummary {
1166 project_id: project.id.to_proto(),
1167 worktree_id: worktree.id,
1168 summary: Some(summary),
1169 },
1170 )?;
1171 }
1172
1173 for settings_file in worktree.settings_files {
1174 session.peer.send(
1175 session.connection_id,
1176 proto::UpdateWorktreeSettings {
1177 project_id: project.id.to_proto(),
1178 worktree_id: worktree.id,
1179 path: settings_file.path,
1180 content: Some(settings_file.content),
1181 },
1182 )?;
1183 }
1184 }
1185
1186 for language_server in &project.language_servers {
1187 session.peer.send(
1188 session.connection_id,
1189 proto::UpdateLanguageServer {
1190 project_id: project.id.to_proto(),
1191 language_server_id: language_server.id,
1192 variant: Some(
1193 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1194 proto::LspDiskBasedDiagnosticsUpdated {},
1195 ),
1196 ),
1197 },
1198 )?;
1199 }
1200 }
1201
1202 let rejoined_room = rejoined_room.into_inner();
1203
1204 room = rejoined_room.room;
1205 channel_id = rejoined_room.channel_id;
1206 channel_members = rejoined_room.channel_members;
1207 }
1208
1209 if let Some(channel_id) = channel_id {
1210 channel_updated(
1211 channel_id,
1212 &room,
1213 &channel_members,
1214 &session.peer,
1215 &*session.connection_pool().await,
1216 );
1217 }
1218
1219 update_user_contacts(session.user_id, &session).await?;
1220 Ok(())
1221}
1222
1223async fn leave_room(
1224 _: proto::LeaveRoom,
1225 response: Response<proto::LeaveRoom>,
1226 session: Session,
1227) -> Result<()> {
1228 leave_room_for_session(&session).await?;
1229 response.send(proto::Ack {})?;
1230 Ok(())
1231}
1232
1233async fn call(
1234 request: proto::Call,
1235 response: Response<proto::Call>,
1236 session: Session,
1237) -> Result<()> {
1238 let room_id = RoomId::from_proto(request.room_id);
1239 let calling_user_id = session.user_id;
1240 let calling_connection_id = session.connection_id;
1241 let called_user_id = UserId::from_proto(request.called_user_id);
1242 let initial_project_id = request.initial_project_id.map(ProjectId::from_proto);
1243 if !session
1244 .db()
1245 .await
1246 .has_contact(calling_user_id, called_user_id)
1247 .await?
1248 {
1249 return Err(anyhow!("cannot call a user who isn't a contact"))?;
1250 }
1251
1252 let incoming_call = {
1253 let (room, incoming_call) = &mut *session
1254 .db()
1255 .await
1256 .call(
1257 room_id,
1258 calling_user_id,
1259 calling_connection_id,
1260 called_user_id,
1261 initial_project_id,
1262 )
1263 .await?;
1264 room_updated(&room, &session.peer);
1265 mem::take(incoming_call)
1266 };
1267 update_user_contacts(called_user_id, &session).await?;
1268
1269 let mut calls = session
1270 .connection_pool()
1271 .await
1272 .user_connection_ids(called_user_id)
1273 .map(|connection_id| session.peer.request(connection_id, incoming_call.clone()))
1274 .collect::<FuturesUnordered<_>>();
1275
1276 while let Some(call_response) = calls.next().await {
1277 match call_response.as_ref() {
1278 Ok(_) => {
1279 response.send(proto::Ack {})?;
1280 return Ok(());
1281 }
1282 Err(_) => {
1283 call_response.trace_err();
1284 }
1285 }
1286 }
1287
1288 {
1289 let room = session
1290 .db()
1291 .await
1292 .call_failed(room_id, called_user_id)
1293 .await?;
1294 room_updated(&room, &session.peer);
1295 }
1296 update_user_contacts(called_user_id, &session).await?;
1297
1298 Err(anyhow!("failed to ring user"))?
1299}
1300
1301async fn cancel_call(
1302 request: proto::CancelCall,
1303 response: Response<proto::CancelCall>,
1304 session: Session,
1305) -> Result<()> {
1306 let called_user_id = UserId::from_proto(request.called_user_id);
1307 let room_id = RoomId::from_proto(request.room_id);
1308 {
1309 let room = session
1310 .db()
1311 .await
1312 .cancel_call(room_id, session.connection_id, called_user_id)
1313 .await?;
1314 room_updated(&room, &session.peer);
1315 }
1316
1317 for connection_id in session
1318 .connection_pool()
1319 .await
1320 .user_connection_ids(called_user_id)
1321 {
1322 session
1323 .peer
1324 .send(
1325 connection_id,
1326 proto::CallCanceled {
1327 room_id: room_id.to_proto(),
1328 },
1329 )
1330 .trace_err();
1331 }
1332 response.send(proto::Ack {})?;
1333
1334 update_user_contacts(called_user_id, &session).await?;
1335 Ok(())
1336}
1337
1338async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> {
1339 let room_id = RoomId::from_proto(message.room_id);
1340 {
1341 let room = session
1342 .db()
1343 .await
1344 .decline_call(Some(room_id), session.user_id)
1345 .await?
1346 .ok_or_else(|| anyhow!("failed to decline call"))?;
1347 room_updated(&room, &session.peer);
1348 }
1349
1350 for connection_id in session
1351 .connection_pool()
1352 .await
1353 .user_connection_ids(session.user_id)
1354 {
1355 session
1356 .peer
1357 .send(
1358 connection_id,
1359 proto::CallCanceled {
1360 room_id: room_id.to_proto(),
1361 },
1362 )
1363 .trace_err();
1364 }
1365 update_user_contacts(session.user_id, &session).await?;
1366 Ok(())
1367}
1368
1369async fn update_participant_location(
1370 request: proto::UpdateParticipantLocation,
1371 response: Response<proto::UpdateParticipantLocation>,
1372 session: Session,
1373) -> Result<()> {
1374 let room_id = RoomId::from_proto(request.room_id);
1375 let location = request
1376 .location
1377 .ok_or_else(|| anyhow!("invalid location"))?;
1378
1379 let db = session.db().await;
1380 let room = db
1381 .update_room_participant_location(room_id, session.connection_id, location)
1382 .await?;
1383
1384 room_updated(&room, &session.peer);
1385 response.send(proto::Ack {})?;
1386 Ok(())
1387}
1388
1389async fn share_project(
1390 request: proto::ShareProject,
1391 response: Response<proto::ShareProject>,
1392 session: Session,
1393) -> Result<()> {
1394 let (project_id, room) = &*session
1395 .db()
1396 .await
1397 .share_project(
1398 RoomId::from_proto(request.room_id),
1399 session.connection_id,
1400 &request.worktrees,
1401 )
1402 .await?;
1403 response.send(proto::ShareProjectResponse {
1404 project_id: project_id.to_proto(),
1405 })?;
1406 room_updated(&room, &session.peer);
1407
1408 Ok(())
1409}
1410
1411async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
1412 let project_id = ProjectId::from_proto(message.project_id);
1413
1414 let (room, guest_connection_ids) = &*session
1415 .db()
1416 .await
1417 .unshare_project(project_id, session.connection_id)
1418 .await?;
1419
1420 broadcast(
1421 Some(session.connection_id),
1422 guest_connection_ids.iter().copied(),
1423 |conn_id| session.peer.send(conn_id, message.clone()),
1424 );
1425 room_updated(&room, &session.peer);
1426
1427 Ok(())
1428}
1429
1430async fn join_project(
1431 request: proto::JoinProject,
1432 response: Response<proto::JoinProject>,
1433 session: Session,
1434) -> Result<()> {
1435 let project_id = ProjectId::from_proto(request.project_id);
1436 let guest_user_id = session.user_id;
1437
1438 tracing::info!(%project_id, "join project");
1439
1440 let (project, replica_id) = &mut *session
1441 .db()
1442 .await
1443 .join_project(project_id, session.connection_id)
1444 .await?;
1445
1446 let collaborators = project
1447 .collaborators
1448 .iter()
1449 .filter(|collaborator| collaborator.connection_id != session.connection_id)
1450 .map(|collaborator| collaborator.to_proto())
1451 .collect::<Vec<_>>();
1452
1453 let worktrees = project
1454 .worktrees
1455 .iter()
1456 .map(|(id, worktree)| proto::WorktreeMetadata {
1457 id: *id,
1458 root_name: worktree.root_name.clone(),
1459 visible: worktree.visible,
1460 abs_path: worktree.abs_path.clone(),
1461 })
1462 .collect::<Vec<_>>();
1463
1464 for collaborator in &collaborators {
1465 session
1466 .peer
1467 .send(
1468 collaborator.peer_id.unwrap().into(),
1469 proto::AddProjectCollaborator {
1470 project_id: project_id.to_proto(),
1471 collaborator: Some(proto::Collaborator {
1472 peer_id: Some(session.connection_id.into()),
1473 replica_id: replica_id.0 as u32,
1474 user_id: guest_user_id.to_proto(),
1475 }),
1476 },
1477 )
1478 .trace_err();
1479 }
1480
1481 // First, we send the metadata associated with each worktree.
1482 response.send(proto::JoinProjectResponse {
1483 worktrees: worktrees.clone(),
1484 replica_id: replica_id.0 as u32,
1485 collaborators: collaborators.clone(),
1486 language_servers: project.language_servers.clone(),
1487 })?;
1488
1489 for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
1490 #[cfg(any(test, feature = "test-support"))]
1491 const MAX_CHUNK_SIZE: usize = 2;
1492 #[cfg(not(any(test, feature = "test-support")))]
1493 const MAX_CHUNK_SIZE: usize = 256;
1494
1495 // Stream this worktree's entries.
1496 let message = proto::UpdateWorktree {
1497 project_id: project_id.to_proto(),
1498 worktree_id,
1499 abs_path: worktree.abs_path.clone(),
1500 root_name: worktree.root_name,
1501 updated_entries: worktree.entries,
1502 removed_entries: Default::default(),
1503 scan_id: worktree.scan_id,
1504 is_last_update: worktree.scan_id == worktree.completed_scan_id,
1505 updated_repositories: worktree.repository_entries.into_values().collect(),
1506 removed_repositories: Default::default(),
1507 };
1508 for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
1509 session.peer.send(session.connection_id, update.clone())?;
1510 }
1511
1512 // Stream this worktree's diagnostics.
1513 for summary in worktree.diagnostic_summaries {
1514 session.peer.send(
1515 session.connection_id,
1516 proto::UpdateDiagnosticSummary {
1517 project_id: project_id.to_proto(),
1518 worktree_id: worktree.id,
1519 summary: Some(summary),
1520 },
1521 )?;
1522 }
1523
1524 for settings_file in worktree.settings_files {
1525 session.peer.send(
1526 session.connection_id,
1527 proto::UpdateWorktreeSettings {
1528 project_id: project_id.to_proto(),
1529 worktree_id: worktree.id,
1530 path: settings_file.path,
1531 content: Some(settings_file.content),
1532 },
1533 )?;
1534 }
1535 }
1536
1537 for language_server in &project.language_servers {
1538 session.peer.send(
1539 session.connection_id,
1540 proto::UpdateLanguageServer {
1541 project_id: project_id.to_proto(),
1542 language_server_id: language_server.id,
1543 variant: Some(
1544 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1545 proto::LspDiskBasedDiagnosticsUpdated {},
1546 ),
1547 ),
1548 },
1549 )?;
1550 }
1551
1552 Ok(())
1553}
1554
1555async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> {
1556 let sender_id = session.connection_id;
1557 let project_id = ProjectId::from_proto(request.project_id);
1558
1559 let (room, project) = &*session
1560 .db()
1561 .await
1562 .leave_project(project_id, sender_id)
1563 .await?;
1564 tracing::info!(
1565 %project_id,
1566 host_user_id = %project.host_user_id,
1567 host_connection_id = %project.host_connection_id,
1568 "leave project"
1569 );
1570
1571 project_left(&project, &session);
1572 room_updated(&room, &session.peer);
1573
1574 Ok(())
1575}
1576
1577async fn update_project(
1578 request: proto::UpdateProject,
1579 response: Response<proto::UpdateProject>,
1580 session: Session,
1581) -> Result<()> {
1582 let project_id = ProjectId::from_proto(request.project_id);
1583 let (room, guest_connection_ids) = &*session
1584 .db()
1585 .await
1586 .update_project(project_id, session.connection_id, &request.worktrees)
1587 .await?;
1588 broadcast(
1589 Some(session.connection_id),
1590 guest_connection_ids.iter().copied(),
1591 |connection_id| {
1592 session
1593 .peer
1594 .forward_send(session.connection_id, connection_id, request.clone())
1595 },
1596 );
1597 room_updated(&room, &session.peer);
1598 response.send(proto::Ack {})?;
1599
1600 Ok(())
1601}
1602
1603async fn update_worktree(
1604 request: proto::UpdateWorktree,
1605 response: Response<proto::UpdateWorktree>,
1606 session: Session,
1607) -> Result<()> {
1608 let guest_connection_ids = session
1609 .db()
1610 .await
1611 .update_worktree(&request, session.connection_id)
1612 .await?;
1613
1614 broadcast(
1615 Some(session.connection_id),
1616 guest_connection_ids.iter().copied(),
1617 |connection_id| {
1618 session
1619 .peer
1620 .forward_send(session.connection_id, connection_id, request.clone())
1621 },
1622 );
1623 response.send(proto::Ack {})?;
1624 Ok(())
1625}
1626
1627async fn update_diagnostic_summary(
1628 message: proto::UpdateDiagnosticSummary,
1629 session: Session,
1630) -> Result<()> {
1631 let guest_connection_ids = session
1632 .db()
1633 .await
1634 .update_diagnostic_summary(&message, session.connection_id)
1635 .await?;
1636
1637 broadcast(
1638 Some(session.connection_id),
1639 guest_connection_ids.iter().copied(),
1640 |connection_id| {
1641 session
1642 .peer
1643 .forward_send(session.connection_id, connection_id, message.clone())
1644 },
1645 );
1646
1647 Ok(())
1648}
1649
1650async fn update_worktree_settings(
1651 message: proto::UpdateWorktreeSettings,
1652 session: Session,
1653) -> Result<()> {
1654 let guest_connection_ids = session
1655 .db()
1656 .await
1657 .update_worktree_settings(&message, session.connection_id)
1658 .await?;
1659
1660 broadcast(
1661 Some(session.connection_id),
1662 guest_connection_ids.iter().copied(),
1663 |connection_id| {
1664 session
1665 .peer
1666 .forward_send(session.connection_id, connection_id, message.clone())
1667 },
1668 );
1669
1670 Ok(())
1671}
1672
1673async fn refresh_inlay_hints(request: proto::RefreshInlayHints, session: Session) -> Result<()> {
1674 broadcast_project_message(request.project_id, request, session).await
1675}
1676
1677async fn start_language_server(
1678 request: proto::StartLanguageServer,
1679 session: Session,
1680) -> Result<()> {
1681 let guest_connection_ids = session
1682 .db()
1683 .await
1684 .start_language_server(&request, session.connection_id)
1685 .await?;
1686
1687 broadcast(
1688 Some(session.connection_id),
1689 guest_connection_ids.iter().copied(),
1690 |connection_id| {
1691 session
1692 .peer
1693 .forward_send(session.connection_id, connection_id, request.clone())
1694 },
1695 );
1696 Ok(())
1697}
1698
1699async fn update_language_server(
1700 request: proto::UpdateLanguageServer,
1701 session: Session,
1702) -> Result<()> {
1703 session.executor.record_backtrace();
1704 let project_id = ProjectId::from_proto(request.project_id);
1705 let project_connection_ids = session
1706 .db()
1707 .await
1708 .project_connection_ids(project_id, session.connection_id)
1709 .await?;
1710 broadcast(
1711 Some(session.connection_id),
1712 project_connection_ids.iter().copied(),
1713 |connection_id| {
1714 session
1715 .peer
1716 .forward_send(session.connection_id, connection_id, request.clone())
1717 },
1718 );
1719 Ok(())
1720}
1721
1722async fn forward_project_request<T>(
1723 request: T,
1724 response: Response<T>,
1725 session: Session,
1726) -> Result<()>
1727where
1728 T: EntityMessage + RequestMessage,
1729{
1730 session.executor.record_backtrace();
1731 let project_id = ProjectId::from_proto(request.remote_entity_id());
1732 let host_connection_id = {
1733 let collaborators = session
1734 .db()
1735 .await
1736 .project_collaborators(project_id, session.connection_id)
1737 .await?;
1738 collaborators
1739 .iter()
1740 .find(|collaborator| collaborator.is_host)
1741 .ok_or_else(|| anyhow!("host not found"))?
1742 .connection_id
1743 };
1744
1745 let payload = session
1746 .peer
1747 .forward_request(session.connection_id, host_connection_id, request)
1748 .await?;
1749
1750 response.send(payload)?;
1751 Ok(())
1752}
1753
1754async fn create_buffer_for_peer(
1755 request: proto::CreateBufferForPeer,
1756 session: Session,
1757) -> Result<()> {
1758 session.executor.record_backtrace();
1759 let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
1760 session
1761 .peer
1762 .forward_send(session.connection_id, peer_id.into(), request)?;
1763 Ok(())
1764}
1765
1766async fn update_buffer(
1767 request: proto::UpdateBuffer,
1768 response: Response<proto::UpdateBuffer>,
1769 session: Session,
1770) -> Result<()> {
1771 session.executor.record_backtrace();
1772 let project_id = ProjectId::from_proto(request.project_id);
1773 let mut guest_connection_ids;
1774 let mut host_connection_id = None;
1775 {
1776 let collaborators = session
1777 .db()
1778 .await
1779 .project_collaborators(project_id, session.connection_id)
1780 .await?;
1781 guest_connection_ids = Vec::with_capacity(collaborators.len() - 1);
1782 for collaborator in collaborators.iter() {
1783 if collaborator.is_host {
1784 host_connection_id = Some(collaborator.connection_id);
1785 } else {
1786 guest_connection_ids.push(collaborator.connection_id);
1787 }
1788 }
1789 }
1790 let host_connection_id = host_connection_id.ok_or_else(|| anyhow!("host not found"))?;
1791
1792 session.executor.record_backtrace();
1793 broadcast(
1794 Some(session.connection_id),
1795 guest_connection_ids,
1796 |connection_id| {
1797 session
1798 .peer
1799 .forward_send(session.connection_id, connection_id, request.clone())
1800 },
1801 );
1802 if host_connection_id != session.connection_id {
1803 session
1804 .peer
1805 .forward_request(session.connection_id, host_connection_id, request.clone())
1806 .await?;
1807 }
1808
1809 response.send(proto::Ack {})?;
1810 Ok(())
1811}
1812
1813async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) -> Result<()> {
1814 let project_id = ProjectId::from_proto(request.project_id);
1815 let project_connection_ids = session
1816 .db()
1817 .await
1818 .project_connection_ids(project_id, session.connection_id)
1819 .await?;
1820
1821 broadcast(
1822 Some(session.connection_id),
1823 project_connection_ids.iter().copied(),
1824 |connection_id| {
1825 session
1826 .peer
1827 .forward_send(session.connection_id, connection_id, request.clone())
1828 },
1829 );
1830 Ok(())
1831}
1832
1833async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Result<()> {
1834 let project_id = ProjectId::from_proto(request.project_id);
1835 let project_connection_ids = session
1836 .db()
1837 .await
1838 .project_connection_ids(project_id, session.connection_id)
1839 .await?;
1840 broadcast(
1841 Some(session.connection_id),
1842 project_connection_ids.iter().copied(),
1843 |connection_id| {
1844 session
1845 .peer
1846 .forward_send(session.connection_id, connection_id, request.clone())
1847 },
1848 );
1849 Ok(())
1850}
1851
1852async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<()> {
1853 broadcast_project_message(request.project_id, request, session).await
1854}
1855
1856async fn broadcast_project_message<T: EnvelopedMessage>(
1857 project_id: u64,
1858 request: T,
1859 session: Session,
1860) -> Result<()> {
1861 let project_id = ProjectId::from_proto(project_id);
1862 let project_connection_ids = session
1863 .db()
1864 .await
1865 .project_connection_ids(project_id, session.connection_id)
1866 .await?;
1867 broadcast(
1868 Some(session.connection_id),
1869 project_connection_ids.iter().copied(),
1870 |connection_id| {
1871 session
1872 .peer
1873 .forward_send(session.connection_id, connection_id, request.clone())
1874 },
1875 );
1876 Ok(())
1877}
1878
1879async fn follow(
1880 request: proto::Follow,
1881 response: Response<proto::Follow>,
1882 session: Session,
1883) -> Result<()> {
1884 let project_id = ProjectId::from_proto(request.project_id);
1885 let leader_id = request
1886 .leader_id
1887 .ok_or_else(|| anyhow!("invalid leader id"))?
1888 .into();
1889 let follower_id = session.connection_id;
1890
1891 {
1892 let project_connection_ids = session
1893 .db()
1894 .await
1895 .project_connection_ids(project_id, session.connection_id)
1896 .await?;
1897
1898 if !project_connection_ids.contains(&leader_id) {
1899 Err(anyhow!("no such peer"))?;
1900 }
1901 }
1902
1903 let mut response_payload = session
1904 .peer
1905 .forward_request(session.connection_id, leader_id, request)
1906 .await?;
1907 response_payload
1908 .views
1909 .retain(|view| view.leader_id != Some(follower_id.into()));
1910 response.send(response_payload)?;
1911
1912 let room = session
1913 .db()
1914 .await
1915 .follow(project_id, leader_id, follower_id)
1916 .await?;
1917 room_updated(&room, &session.peer);
1918
1919 Ok(())
1920}
1921
1922async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
1923 let project_id = ProjectId::from_proto(request.project_id);
1924 let leader_id = request
1925 .leader_id
1926 .ok_or_else(|| anyhow!("invalid leader id"))?
1927 .into();
1928 let follower_id = session.connection_id;
1929
1930 if !session
1931 .db()
1932 .await
1933 .project_connection_ids(project_id, session.connection_id)
1934 .await?
1935 .contains(&leader_id)
1936 {
1937 Err(anyhow!("no such peer"))?;
1938 }
1939
1940 session
1941 .peer
1942 .forward_send(session.connection_id, leader_id, request)?;
1943
1944 let room = session
1945 .db()
1946 .await
1947 .unfollow(project_id, leader_id, follower_id)
1948 .await?;
1949 room_updated(&room, &session.peer);
1950
1951 Ok(())
1952}
1953
1954async fn update_followers(request: proto::UpdateFollowers, session: Session) -> Result<()> {
1955 let project_id = ProjectId::from_proto(request.project_id);
1956 let project_connection_ids = session
1957 .db
1958 .lock()
1959 .await
1960 .project_connection_ids(project_id, session.connection_id)
1961 .await?;
1962
1963 let leader_id = request.variant.as_ref().and_then(|variant| match variant {
1964 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1965 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1966 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1967 });
1968 for follower_peer_id in request.follower_ids.iter().copied() {
1969 let follower_connection_id = follower_peer_id.into();
1970 if project_connection_ids.contains(&follower_connection_id)
1971 && Some(follower_peer_id) != leader_id
1972 {
1973 session.peer.forward_send(
1974 session.connection_id,
1975 follower_connection_id,
1976 request.clone(),
1977 )?;
1978 }
1979 }
1980 Ok(())
1981}
1982
1983async fn get_users(
1984 request: proto::GetUsers,
1985 response: Response<proto::GetUsers>,
1986 session: Session,
1987) -> Result<()> {
1988 let user_ids = request
1989 .user_ids
1990 .into_iter()
1991 .map(UserId::from_proto)
1992 .collect();
1993 let users = session
1994 .db()
1995 .await
1996 .get_users_by_ids(user_ids)
1997 .await?
1998 .into_iter()
1999 .map(|user| proto::User {
2000 id: user.id.to_proto(),
2001 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2002 github_login: user.github_login,
2003 })
2004 .collect();
2005 response.send(proto::UsersResponse { users })?;
2006 Ok(())
2007}
2008
2009async fn fuzzy_search_users(
2010 request: proto::FuzzySearchUsers,
2011 response: Response<proto::FuzzySearchUsers>,
2012 session: Session,
2013) -> Result<()> {
2014 let query = request.query;
2015 let users = match query.len() {
2016 0 => vec![],
2017 1 | 2 => session
2018 .db()
2019 .await
2020 .get_user_by_github_login(&query)
2021 .await?
2022 .into_iter()
2023 .collect(),
2024 _ => session.db().await.fuzzy_search_users(&query, 10).await?,
2025 };
2026 let users = users
2027 .into_iter()
2028 .filter(|user| user.id != session.user_id)
2029 .map(|user| proto::User {
2030 id: user.id.to_proto(),
2031 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
2032 github_login: user.github_login,
2033 })
2034 .collect();
2035 response.send(proto::UsersResponse { users })?;
2036 Ok(())
2037}
2038
2039async fn request_contact(
2040 request: proto::RequestContact,
2041 response: Response<proto::RequestContact>,
2042 session: Session,
2043) -> Result<()> {
2044 let requester_id = session.user_id;
2045 let responder_id = UserId::from_proto(request.responder_id);
2046 if requester_id == responder_id {
2047 return Err(anyhow!("cannot add yourself as a contact"))?;
2048 }
2049
2050 session
2051 .db()
2052 .await
2053 .send_contact_request(requester_id, responder_id)
2054 .await?;
2055
2056 // Update outgoing contact requests of requester
2057 let mut update = proto::UpdateContacts::default();
2058 update.outgoing_requests.push(responder_id.to_proto());
2059 for connection_id in session
2060 .connection_pool()
2061 .await
2062 .user_connection_ids(requester_id)
2063 {
2064 session.peer.send(connection_id, update.clone())?;
2065 }
2066
2067 // Update incoming contact requests of responder
2068 let mut update = proto::UpdateContacts::default();
2069 update
2070 .incoming_requests
2071 .push(proto::IncomingContactRequest {
2072 requester_id: requester_id.to_proto(),
2073 should_notify: true,
2074 });
2075 for connection_id in session
2076 .connection_pool()
2077 .await
2078 .user_connection_ids(responder_id)
2079 {
2080 session.peer.send(connection_id, update.clone())?;
2081 }
2082
2083 response.send(proto::Ack {})?;
2084 Ok(())
2085}
2086
2087async fn respond_to_contact_request(
2088 request: proto::RespondToContactRequest,
2089 response: Response<proto::RespondToContactRequest>,
2090 session: Session,
2091) -> Result<()> {
2092 let responder_id = session.user_id;
2093 let requester_id = UserId::from_proto(request.requester_id);
2094 let db = session.db().await;
2095 if request.response == proto::ContactRequestResponse::Dismiss as i32 {
2096 db.dismiss_contact_notification(responder_id, requester_id)
2097 .await?;
2098 } else {
2099 let accept = request.response == proto::ContactRequestResponse::Accept as i32;
2100
2101 db.respond_to_contact_request(responder_id, requester_id, accept)
2102 .await?;
2103 let requester_busy = db.is_user_busy(requester_id).await?;
2104 let responder_busy = db.is_user_busy(responder_id).await?;
2105
2106 let pool = session.connection_pool().await;
2107 // Update responder with new contact
2108 let mut update = proto::UpdateContacts::default();
2109 if accept {
2110 update
2111 .contacts
2112 .push(contact_for_user(requester_id, false, requester_busy, &pool));
2113 }
2114 update
2115 .remove_incoming_requests
2116 .push(requester_id.to_proto());
2117 for connection_id in pool.user_connection_ids(responder_id) {
2118 session.peer.send(connection_id, update.clone())?;
2119 }
2120
2121 // Update requester with new contact
2122 let mut update = proto::UpdateContacts::default();
2123 if accept {
2124 update
2125 .contacts
2126 .push(contact_for_user(responder_id, true, responder_busy, &pool));
2127 }
2128 update
2129 .remove_outgoing_requests
2130 .push(responder_id.to_proto());
2131 for connection_id in pool.user_connection_ids(requester_id) {
2132 session.peer.send(connection_id, update.clone())?;
2133 }
2134 }
2135
2136 response.send(proto::Ack {})?;
2137 Ok(())
2138}
2139
2140async fn remove_contact(
2141 request: proto::RemoveContact,
2142 response: Response<proto::RemoveContact>,
2143 session: Session,
2144) -> Result<()> {
2145 let requester_id = session.user_id;
2146 let responder_id = UserId::from_proto(request.user_id);
2147 let db = session.db().await;
2148 let contact_accepted = db.remove_contact(requester_id, responder_id).await?;
2149
2150 let pool = session.connection_pool().await;
2151 // Update outgoing contact requests of requester
2152 let mut update = proto::UpdateContacts::default();
2153 if contact_accepted {
2154 update.remove_contacts.push(responder_id.to_proto());
2155 } else {
2156 update
2157 .remove_outgoing_requests
2158 .push(responder_id.to_proto());
2159 }
2160 for connection_id in pool.user_connection_ids(requester_id) {
2161 session.peer.send(connection_id, update.clone())?;
2162 }
2163
2164 // Update incoming contact requests of responder
2165 let mut update = proto::UpdateContacts::default();
2166 if contact_accepted {
2167 update.remove_contacts.push(requester_id.to_proto());
2168 } else {
2169 update
2170 .remove_incoming_requests
2171 .push(requester_id.to_proto());
2172 }
2173 for connection_id in pool.user_connection_ids(responder_id) {
2174 session.peer.send(connection_id, update.clone())?;
2175 }
2176
2177 response.send(proto::Ack {})?;
2178 Ok(())
2179}
2180
2181async fn create_channel(
2182 request: proto::CreateChannel,
2183 response: Response<proto::CreateChannel>,
2184 session: Session,
2185) -> Result<()> {
2186 let db = session.db().await;
2187 let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
2188
2189 if let Some(live_kit) = session.live_kit_client.as_ref() {
2190 live_kit.create_room(live_kit_room.clone()).await?;
2191 }
2192
2193 let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
2194 let id = db
2195 .create_channel(&request.name, parent_id, &live_kit_room, session.user_id)
2196 .await?;
2197
2198 let channel = proto::Channel {
2199 id: id.to_proto(),
2200 name: request.name,
2201 parent_id: request.parent_id,
2202 };
2203
2204 response.send(proto::ChannelResponse {
2205 channel: Some(channel.clone()),
2206 })?;
2207
2208 let mut update = proto::UpdateChannels::default();
2209 update.channels.push(channel);
2210
2211 let user_ids_to_notify = if let Some(parent_id) = parent_id {
2212 db.get_channel_members(parent_id).await?
2213 } else {
2214 vec![session.user_id]
2215 };
2216
2217 let connection_pool = session.connection_pool().await;
2218 for user_id in user_ids_to_notify {
2219 for connection_id in connection_pool.user_connection_ids(user_id) {
2220 let mut update = update.clone();
2221 if user_id == session.user_id {
2222 update.channel_permissions.push(proto::ChannelPermission {
2223 channel_id: id.to_proto(),
2224 is_admin: true,
2225 });
2226 }
2227 session.peer.send(connection_id, update)?;
2228 }
2229 }
2230
2231 Ok(())
2232}
2233
2234async fn delete_channel(
2235 request: proto::DeleteChannel,
2236 response: Response<proto::DeleteChannel>,
2237 session: Session,
2238) -> Result<()> {
2239 let db = session.db().await;
2240
2241 let channel_id = request.channel_id;
2242 let (removed_channels, member_ids) = db
2243 .delete_channel(ChannelId::from_proto(channel_id), session.user_id)
2244 .await?;
2245 response.send(proto::Ack {})?;
2246
2247 // Notify members of removed channels
2248 let mut update = proto::UpdateChannels::default();
2249 update
2250 .delete_channels
2251 .extend(removed_channels.into_iter().map(|id| id.to_proto()));
2252
2253 let connection_pool = session.connection_pool().await;
2254 for member_id in member_ids {
2255 for connection_id in connection_pool.user_connection_ids(member_id) {
2256 session.peer.send(connection_id, update.clone())?;
2257 }
2258 }
2259
2260 Ok(())
2261}
2262
2263async fn invite_channel_member(
2264 request: proto::InviteChannelMember,
2265 response: Response<proto::InviteChannelMember>,
2266 session: Session,
2267) -> Result<()> {
2268 let db = session.db().await;
2269 let channel_id = ChannelId::from_proto(request.channel_id);
2270 let invitee_id = UserId::from_proto(request.user_id);
2271 db.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
2272 .await?;
2273
2274 let (channel, _) = db
2275 .get_channel(channel_id, session.user_id)
2276 .await?
2277 .ok_or_else(|| anyhow!("channel not found"))?;
2278
2279 let mut update = proto::UpdateChannels::default();
2280 update.channel_invitations.push(proto::Channel {
2281 id: channel.id.to_proto(),
2282 name: channel.name,
2283 parent_id: None,
2284 });
2285 for connection_id in session
2286 .connection_pool()
2287 .await
2288 .user_connection_ids(invitee_id)
2289 {
2290 session.peer.send(connection_id, update.clone())?;
2291 }
2292
2293 response.send(proto::Ack {})?;
2294 Ok(())
2295}
2296
2297async fn remove_channel_member(
2298 request: proto::RemoveChannelMember,
2299 response: Response<proto::RemoveChannelMember>,
2300 session: Session,
2301) -> Result<()> {
2302 let db = session.db().await;
2303 let channel_id = ChannelId::from_proto(request.channel_id);
2304 let member_id = UserId::from_proto(request.user_id);
2305
2306 db.remove_channel_member(channel_id, member_id, session.user_id)
2307 .await?;
2308
2309 let mut update = proto::UpdateChannels::default();
2310 update.delete_channels.push(channel_id.to_proto());
2311
2312 for connection_id in session
2313 .connection_pool()
2314 .await
2315 .user_connection_ids(member_id)
2316 {
2317 session.peer.send(connection_id, update.clone())?;
2318 }
2319
2320 response.send(proto::Ack {})?;
2321 Ok(())
2322}
2323
2324async fn set_channel_member_admin(
2325 request: proto::SetChannelMemberAdmin,
2326 response: Response<proto::SetChannelMemberAdmin>,
2327 session: Session,
2328) -> Result<()> {
2329 let db = session.db().await;
2330 let channel_id = ChannelId::from_proto(request.channel_id);
2331 let member_id = UserId::from_proto(request.user_id);
2332 db.set_channel_member_admin(channel_id, session.user_id, member_id, request.admin)
2333 .await?;
2334
2335 let (channel, has_accepted) = db
2336 .get_channel(channel_id, member_id)
2337 .await?
2338 .ok_or_else(|| anyhow!("channel not found"))?;
2339
2340 let mut update = proto::UpdateChannels::default();
2341 if has_accepted {
2342 update.channel_permissions.push(proto::ChannelPermission {
2343 channel_id: channel.id.to_proto(),
2344 is_admin: request.admin,
2345 });
2346 }
2347
2348 for connection_id in session
2349 .connection_pool()
2350 .await
2351 .user_connection_ids(member_id)
2352 {
2353 session.peer.send(connection_id, update.clone())?;
2354 }
2355
2356 response.send(proto::Ack {})?;
2357 Ok(())
2358}
2359
2360async fn rename_channel(
2361 request: proto::RenameChannel,
2362 response: Response<proto::RenameChannel>,
2363 session: Session,
2364) -> Result<()> {
2365 let db = session.db().await;
2366 let channel_id = ChannelId::from_proto(request.channel_id);
2367 let new_name = db
2368 .rename_channel(channel_id, session.user_id, &request.name)
2369 .await?;
2370
2371 let channel = proto::Channel {
2372 id: request.channel_id,
2373 name: new_name,
2374 parent_id: None,
2375 };
2376 response.send(proto::ChannelResponse {
2377 channel: Some(channel.clone()),
2378 })?;
2379 let mut update = proto::UpdateChannels::default();
2380 update.channels.push(channel);
2381
2382 let member_ids = db.get_channel_members(channel_id).await?;
2383
2384 let connection_pool = session.connection_pool().await;
2385 for member_id in member_ids {
2386 for connection_id in connection_pool.user_connection_ids(member_id) {
2387 session.peer.send(connection_id, update.clone())?;
2388 }
2389 }
2390
2391 Ok(())
2392}
2393
2394async fn move_channel(
2395 request: proto::MoveChannel,
2396 response: Response<proto::MoveChannel>,
2397 session: Session,
2398) -> Result<()> {
2399 let db = session.db().await;
2400 let channel_id = ChannelId::from_proto(request.channel_id);
2401 let from_parent = request.from_parent.map(ChannelId::from_proto);
2402 let to = request.to.map(ChannelId::from_proto);
2403 let channels_to_send = db
2404 .move_channel(
2405 session.user_id,
2406 channel_id,
2407 from_parent,
2408 to,
2409 )
2410 .await?;
2411
2412
2413 if let Some(from_parent) = from_parent {
2414 let members = db.get_channel_members(from_parent).await?;
2415 let update = proto::UpdateChannels {
2416 delete_channel_edge: vec![proto::ChannelEdge {
2417 channel_id: channel_id.to_proto(),
2418 parent_id: from_parent.to_proto(),
2419 }],
2420 ..Default::default()
2421 };
2422 let connection_pool = session.connection_pool().await;
2423 for member_id in members {
2424 for connection_id in connection_pool.user_connection_ids(member_id) {
2425 session.peer.send(connection_id, update.clone())?;
2426 }
2427 }
2428
2429 }
2430
2431 if let Some(to) = to {
2432 let members = db.get_channel_members(to).await?;
2433 let connection_pool = session.connection_pool().await;
2434 let update = proto::UpdateChannels {
2435 channels: channels_to_send.into_iter().map(|channel| proto::Channel {
2436 id: channel.id.to_proto(),
2437 name: channel.name,
2438 parent_id: channel.parent_id.map(ChannelId::to_proto),
2439 }).collect(),
2440 ..Default::default()
2441 };
2442 for member_id in members {
2443 for connection_id in connection_pool.user_connection_ids(member_id) {
2444 session.peer.send(connection_id, update.clone())?;
2445 }
2446 }
2447 }
2448
2449 response.send(Ack {})?;
2450
2451 Ok(())
2452}
2453
2454async fn get_channel_members(
2455 request: proto::GetChannelMembers,
2456 response: Response<proto::GetChannelMembers>,
2457 session: Session,
2458) -> Result<()> {
2459 let db = session.db().await;
2460 let channel_id = ChannelId::from_proto(request.channel_id);
2461 let members = db
2462 .get_channel_member_details(channel_id, session.user_id)
2463 .await?;
2464 response.send(proto::GetChannelMembersResponse { members })?;
2465 Ok(())
2466}
2467
2468async fn respond_to_channel_invite(
2469 request: proto::RespondToChannelInvite,
2470 response: Response<proto::RespondToChannelInvite>,
2471 session: Session,
2472) -> Result<()> {
2473 let db = session.db().await;
2474 let channel_id = ChannelId::from_proto(request.channel_id);
2475 db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
2476 .await?;
2477
2478 let mut update = proto::UpdateChannels::default();
2479 update
2480 .remove_channel_invitations
2481 .push(channel_id.to_proto());
2482 if request.accept {
2483 let result = db.get_channels_for_user(session.user_id).await?;
2484 update
2485 .channels
2486 .extend(result.channels.into_iter().map(|channel| proto::Channel {
2487 id: channel.id.to_proto(),
2488 name: channel.name,
2489 parent_id: channel.parent_id.map(ChannelId::to_proto),
2490 }));
2491 update
2492 .channel_participants
2493 .extend(
2494 result
2495 .channel_participants
2496 .into_iter()
2497 .map(|(channel_id, user_ids)| proto::ChannelParticipants {
2498 channel_id: channel_id.to_proto(),
2499 participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
2500 }),
2501 );
2502 update
2503 .channel_permissions
2504 .extend(
2505 result
2506 .channels_with_admin_privileges
2507 .into_iter()
2508 .map(|channel_id| proto::ChannelPermission {
2509 channel_id: channel_id.to_proto(),
2510 is_admin: true,
2511 }),
2512 );
2513 }
2514 session.peer.send(session.connection_id, update)?;
2515 response.send(proto::Ack {})?;
2516
2517 Ok(())
2518}
2519
2520async fn join_channel(
2521 request: proto::JoinChannel,
2522 response: Response<proto::JoinChannel>,
2523 session: Session,
2524) -> Result<()> {
2525 let channel_id = ChannelId::from_proto(request.channel_id);
2526
2527 let joined_room = {
2528 leave_room_for_session(&session).await?;
2529 let db = session.db().await;
2530
2531 let room_id = db.room_id_for_channel(channel_id).await?;
2532
2533 let joined_room = db
2534 .join_room(room_id, session.user_id, session.connection_id)
2535 .await?;
2536
2537 let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
2538 let token = live_kit
2539 .room_token(
2540 &joined_room.room.live_kit_room,
2541 &session.user_id.to_string(),
2542 )
2543 .trace_err()?;
2544
2545 Some(LiveKitConnectionInfo {
2546 server_url: live_kit.url().into(),
2547 token,
2548 })
2549 });
2550
2551 response.send(proto::JoinRoomResponse {
2552 room: Some(joined_room.room.clone()),
2553 channel_id: joined_room.channel_id.map(|id| id.to_proto()),
2554 live_kit_connection_info,
2555 })?;
2556
2557 room_updated(&joined_room.room, &session.peer);
2558
2559 joined_room.into_inner()
2560 };
2561
2562 channel_updated(
2563 channel_id,
2564 &joined_room.room,
2565 &joined_room.channel_members,
2566 &session.peer,
2567 &*session.connection_pool().await,
2568 );
2569
2570 update_user_contacts(session.user_id, &session).await?;
2571
2572 Ok(())
2573}
2574
2575async fn join_channel_buffer(
2576 request: proto::JoinChannelBuffer,
2577 response: Response<proto::JoinChannelBuffer>,
2578 session: Session,
2579) -> Result<()> {
2580 let db = session.db().await;
2581 let channel_id = ChannelId::from_proto(request.channel_id);
2582
2583 let open_response = db
2584 .join_channel_buffer(channel_id, session.user_id, session.connection_id)
2585 .await?;
2586
2587 let replica_id = open_response.replica_id;
2588 let collaborators = open_response.collaborators.clone();
2589
2590 response.send(open_response)?;
2591
2592 let update = AddChannelBufferCollaborator {
2593 channel_id: channel_id.to_proto(),
2594 collaborator: Some(proto::Collaborator {
2595 user_id: session.user_id.to_proto(),
2596 peer_id: Some(session.connection_id.into()),
2597 replica_id,
2598 }),
2599 };
2600 channel_buffer_updated(
2601 session.connection_id,
2602 collaborators
2603 .iter()
2604 .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
2605 &update,
2606 &session.peer,
2607 );
2608
2609 Ok(())
2610}
2611
2612async fn update_channel_buffer(
2613 request: proto::UpdateChannelBuffer,
2614 session: Session,
2615) -> Result<()> {
2616 let db = session.db().await;
2617 let channel_id = ChannelId::from_proto(request.channel_id);
2618
2619 let collaborators = db
2620 .update_channel_buffer(channel_id, session.user_id, &request.operations)
2621 .await?;
2622
2623 channel_buffer_updated(
2624 session.connection_id,
2625 collaborators,
2626 &proto::UpdateChannelBuffer {
2627 channel_id: channel_id.to_proto(),
2628 operations: request.operations,
2629 },
2630 &session.peer,
2631 );
2632 Ok(())
2633}
2634
2635async fn rejoin_channel_buffers(
2636 request: proto::RejoinChannelBuffers,
2637 response: Response<proto::RejoinChannelBuffers>,
2638 session: Session,
2639) -> Result<()> {
2640 let db = session.db().await;
2641 let buffers = db
2642 .rejoin_channel_buffers(&request.buffers, session.user_id, session.connection_id)
2643 .await?;
2644
2645 for buffer in &buffers {
2646 let collaborators_to_notify = buffer
2647 .buffer
2648 .collaborators
2649 .iter()
2650 .filter_map(|c| Some(c.peer_id?.into()));
2651 channel_buffer_updated(
2652 session.connection_id,
2653 collaborators_to_notify,
2654 &proto::UpdateChannelBufferCollaborator {
2655 channel_id: buffer.buffer.channel_id,
2656 old_peer_id: Some(buffer.old_connection_id.into()),
2657 new_peer_id: Some(session.connection_id.into()),
2658 },
2659 &session.peer,
2660 );
2661 }
2662
2663 response.send(proto::RejoinChannelBuffersResponse {
2664 buffers: buffers.into_iter().map(|b| b.buffer).collect(),
2665 })?;
2666
2667 Ok(())
2668}
2669
2670async fn leave_channel_buffer(
2671 request: proto::LeaveChannelBuffer,
2672 response: Response<proto::LeaveChannelBuffer>,
2673 session: Session,
2674) -> Result<()> {
2675 let db = session.db().await;
2676 let channel_id = ChannelId::from_proto(request.channel_id);
2677
2678 let collaborators_to_notify = db
2679 .leave_channel_buffer(channel_id, session.connection_id)
2680 .await?;
2681
2682 response.send(Ack {})?;
2683
2684 channel_buffer_updated(
2685 session.connection_id,
2686 collaborators_to_notify,
2687 &proto::RemoveChannelBufferCollaborator {
2688 channel_id: channel_id.to_proto(),
2689 peer_id: Some(session.connection_id.into()),
2690 },
2691 &session.peer,
2692 );
2693
2694 Ok(())
2695}
2696
2697fn channel_buffer_updated<T: EnvelopedMessage>(
2698 sender_id: ConnectionId,
2699 collaborators: impl IntoIterator<Item = ConnectionId>,
2700 message: &T,
2701 peer: &Peer,
2702) {
2703 broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
2704 peer.send(peer_id.into(), message.clone())
2705 });
2706}
2707
2708async fn send_channel_message(
2709 request: proto::SendChannelMessage,
2710 response: Response<proto::SendChannelMessage>,
2711 session: Session,
2712) -> Result<()> {
2713 // Validate the message body.
2714 let body = request.body.trim().to_string();
2715 if body.len() > MAX_MESSAGE_LEN {
2716 return Err(anyhow!("message is too long"))?;
2717 }
2718 if body.is_empty() {
2719 return Err(anyhow!("message can't be blank"))?;
2720 }
2721
2722 let timestamp = OffsetDateTime::now_utc();
2723 let nonce = request
2724 .nonce
2725 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
2726
2727 let channel_id = ChannelId::from_proto(request.channel_id);
2728 let (message_id, connection_ids) = session
2729 .db()
2730 .await
2731 .create_channel_message(
2732 channel_id,
2733 session.user_id,
2734 &body,
2735 timestamp,
2736 nonce.clone().into(),
2737 )
2738 .await?;
2739 let message = proto::ChannelMessage {
2740 sender_id: session.user_id.to_proto(),
2741 id: message_id.to_proto(),
2742 body,
2743 timestamp: timestamp.unix_timestamp() as u64,
2744 nonce: Some(nonce),
2745 };
2746 broadcast(Some(session.connection_id), connection_ids, |connection| {
2747 session.peer.send(
2748 connection,
2749 proto::ChannelMessageSent {
2750 channel_id: channel_id.to_proto(),
2751 message: Some(message.clone()),
2752 },
2753 )
2754 });
2755 response.send(proto::SendChannelMessageResponse {
2756 message: Some(message),
2757 })?;
2758 Ok(())
2759}
2760
2761async fn remove_channel_message(
2762 request: proto::RemoveChannelMessage,
2763 response: Response<proto::RemoveChannelMessage>,
2764 session: Session,
2765) -> Result<()> {
2766 let channel_id = ChannelId::from_proto(request.channel_id);
2767 let message_id = MessageId::from_proto(request.message_id);
2768 let connection_ids = session
2769 .db()
2770 .await
2771 .remove_channel_message(channel_id, message_id, session.user_id)
2772 .await?;
2773 broadcast(Some(session.connection_id), connection_ids, |connection| {
2774 session.peer.send(connection, request.clone())
2775 });
2776 response.send(proto::Ack {})?;
2777 Ok(())
2778}
2779
2780async fn join_channel_chat(
2781 request: proto::JoinChannelChat,
2782 response: Response<proto::JoinChannelChat>,
2783 session: Session,
2784) -> Result<()> {
2785 let channel_id = ChannelId::from_proto(request.channel_id);
2786
2787 let db = session.db().await;
2788 db.join_channel_chat(channel_id, session.connection_id, session.user_id)
2789 .await?;
2790 let messages = db
2791 .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None)
2792 .await?;
2793 response.send(proto::JoinChannelChatResponse {
2794 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2795 messages,
2796 })?;
2797 Ok(())
2798}
2799
2800async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> {
2801 let channel_id = ChannelId::from_proto(request.channel_id);
2802 session
2803 .db()
2804 .await
2805 .leave_channel_chat(channel_id, session.connection_id, session.user_id)
2806 .await?;
2807 Ok(())
2808}
2809
2810async fn get_channel_messages(
2811 request: proto::GetChannelMessages,
2812 response: Response<proto::GetChannelMessages>,
2813 session: Session,
2814) -> Result<()> {
2815 let channel_id = ChannelId::from_proto(request.channel_id);
2816 let messages = session
2817 .db()
2818 .await
2819 .get_channel_messages(
2820 channel_id,
2821 session.user_id,
2822 MESSAGE_COUNT_PER_PAGE,
2823 Some(MessageId::from_proto(request.before_message_id)),
2824 )
2825 .await?;
2826 response.send(proto::GetChannelMessagesResponse {
2827 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
2828 messages,
2829 })?;
2830 Ok(())
2831}
2832
2833async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
2834 let project_id = ProjectId::from_proto(request.project_id);
2835 let project_connection_ids = session
2836 .db()
2837 .await
2838 .project_connection_ids(project_id, session.connection_id)
2839 .await?;
2840 broadcast(
2841 Some(session.connection_id),
2842 project_connection_ids.iter().copied(),
2843 |connection_id| {
2844 session
2845 .peer
2846 .forward_send(session.connection_id, connection_id, request.clone())
2847 },
2848 );
2849 Ok(())
2850}
2851
2852async fn get_private_user_info(
2853 _request: proto::GetPrivateUserInfo,
2854 response: Response<proto::GetPrivateUserInfo>,
2855 session: Session,
2856) -> Result<()> {
2857 let db = session.db().await;
2858
2859 let metrics_id = db.get_user_metrics_id(session.user_id).await?;
2860 let user = db
2861 .get_user_by_id(session.user_id)
2862 .await?
2863 .ok_or_else(|| anyhow!("user not found"))?;
2864 let flags = db.get_user_flags(session.user_id).await?;
2865
2866 response.send(proto::GetPrivateUserInfoResponse {
2867 metrics_id,
2868 staff: user.admin,
2869 flags,
2870 })?;
2871 Ok(())
2872}
2873
2874fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
2875 match message {
2876 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
2877 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
2878 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
2879 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
2880 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
2881 code: frame.code.into(),
2882 reason: frame.reason,
2883 })),
2884 }
2885}
2886
2887fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
2888 match message {
2889 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
2890 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
2891 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
2892 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
2893 AxumMessage::Close(frame) => {
2894 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
2895 code: frame.code.into(),
2896 reason: frame.reason,
2897 }))
2898 }
2899 }
2900}
2901
2902fn build_initial_channels_update(
2903 channels: ChannelsForUser,
2904 channel_invites: Vec<db::Channel>,
2905) -> proto::UpdateChannels {
2906 let mut update = proto::UpdateChannels::default();
2907
2908 for channel in channels.channels {
2909 update.channels.push(proto::Channel {
2910 id: channel.id.to_proto(),
2911 name: channel.name,
2912 parent_id: channel.parent_id.map(|id| id.to_proto()),
2913 });
2914 }
2915
2916 for (channel_id, participants) in channels.channel_participants {
2917 update
2918 .channel_participants
2919 .push(proto::ChannelParticipants {
2920 channel_id: channel_id.to_proto(),
2921 participant_user_ids: participants.into_iter().map(|id| id.to_proto()).collect(),
2922 });
2923 }
2924
2925 update
2926 .channel_permissions
2927 .extend(
2928 channels
2929 .channels_with_admin_privileges
2930 .into_iter()
2931 .map(|id| proto::ChannelPermission {
2932 channel_id: id.to_proto(),
2933 is_admin: true,
2934 }),
2935 );
2936
2937 for channel in channel_invites {
2938 update.channel_invitations.push(proto::Channel {
2939 id: channel.id.to_proto(),
2940 name: channel.name,
2941 parent_id: None,
2942 });
2943 }
2944
2945 update
2946}
2947
2948fn build_initial_contacts_update(
2949 contacts: Vec<db::Contact>,
2950 pool: &ConnectionPool,
2951) -> proto::UpdateContacts {
2952 let mut update = proto::UpdateContacts::default();
2953
2954 for contact in contacts {
2955 match contact {
2956 db::Contact::Accepted {
2957 user_id,
2958 should_notify,
2959 busy,
2960 } => {
2961 update
2962 .contacts
2963 .push(contact_for_user(user_id, should_notify, busy, &pool));
2964 }
2965 db::Contact::Outgoing { user_id } => update.outgoing_requests.push(user_id.to_proto()),
2966 db::Contact::Incoming {
2967 user_id,
2968 should_notify,
2969 } => update
2970 .incoming_requests
2971 .push(proto::IncomingContactRequest {
2972 requester_id: user_id.to_proto(),
2973 should_notify,
2974 }),
2975 }
2976 }
2977
2978 update
2979}
2980
2981fn contact_for_user(
2982 user_id: UserId,
2983 should_notify: bool,
2984 busy: bool,
2985 pool: &ConnectionPool,
2986) -> proto::Contact {
2987 proto::Contact {
2988 user_id: user_id.to_proto(),
2989 online: pool.is_user_online(user_id),
2990 busy,
2991 should_notify,
2992 }
2993}
2994
2995fn room_updated(room: &proto::Room, peer: &Peer) {
2996 broadcast(
2997 None,
2998 room.participants
2999 .iter()
3000 .filter_map(|participant| Some(participant.peer_id?.into())),
3001 |peer_id| {
3002 peer.send(
3003 peer_id.into(),
3004 proto::RoomUpdated {
3005 room: Some(room.clone()),
3006 },
3007 )
3008 },
3009 );
3010}
3011
3012fn channel_updated(
3013 channel_id: ChannelId,
3014 room: &proto::Room,
3015 channel_members: &[UserId],
3016 peer: &Peer,
3017 pool: &ConnectionPool,
3018) {
3019 let participants = room
3020 .participants
3021 .iter()
3022 .map(|p| p.user_id)
3023 .collect::<Vec<_>>();
3024
3025 broadcast(
3026 None,
3027 channel_members
3028 .iter()
3029 .flat_map(|user_id| pool.user_connection_ids(*user_id)),
3030 |peer_id| {
3031 peer.send(
3032 peer_id.into(),
3033 proto::UpdateChannels {
3034 channel_participants: vec![proto::ChannelParticipants {
3035 channel_id: channel_id.to_proto(),
3036 participant_user_ids: participants.clone(),
3037 }],
3038 ..Default::default()
3039 },
3040 )
3041 },
3042 );
3043}
3044
3045async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
3046 let db = session.db().await;
3047
3048 let contacts = db.get_contacts(user_id).await?;
3049 let busy = db.is_user_busy(user_id).await?;
3050
3051 let pool = session.connection_pool().await;
3052 let updated_contact = contact_for_user(user_id, false, busy, &pool);
3053 for contact in contacts {
3054 if let db::Contact::Accepted {
3055 user_id: contact_user_id,
3056 ..
3057 } = contact
3058 {
3059 for contact_conn_id in pool.user_connection_ids(contact_user_id) {
3060 session
3061 .peer
3062 .send(
3063 contact_conn_id,
3064 proto::UpdateContacts {
3065 contacts: vec![updated_contact.clone()],
3066 remove_contacts: Default::default(),
3067 incoming_requests: Default::default(),
3068 remove_incoming_requests: Default::default(),
3069 outgoing_requests: Default::default(),
3070 remove_outgoing_requests: Default::default(),
3071 },
3072 )
3073 .trace_err();
3074 }
3075 }
3076 }
3077 Ok(())
3078}
3079
3080async fn leave_room_for_session(session: &Session) -> Result<()> {
3081 let mut contacts_to_update = HashSet::default();
3082
3083 let room_id;
3084 let canceled_calls_to_user_ids;
3085 let live_kit_room;
3086 let delete_live_kit_room;
3087 let room;
3088 let channel_members;
3089 let channel_id;
3090
3091 if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
3092 contacts_to_update.insert(session.user_id);
3093
3094 for project in left_room.left_projects.values() {
3095 project_left(project, session);
3096 }
3097
3098 room_id = RoomId::from_proto(left_room.room.id);
3099 canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
3100 live_kit_room = mem::take(&mut left_room.room.live_kit_room);
3101 delete_live_kit_room = left_room.deleted;
3102 room = mem::take(&mut left_room.room);
3103 channel_members = mem::take(&mut left_room.channel_members);
3104 channel_id = left_room.channel_id;
3105
3106 room_updated(&room, &session.peer);
3107 } else {
3108 return Ok(());
3109 }
3110
3111 if let Some(channel_id) = channel_id {
3112 channel_updated(
3113 channel_id,
3114 &room,
3115 &channel_members,
3116 &session.peer,
3117 &*session.connection_pool().await,
3118 );
3119 }
3120
3121 {
3122 let pool = session.connection_pool().await;
3123 for canceled_user_id in canceled_calls_to_user_ids {
3124 for connection_id in pool.user_connection_ids(canceled_user_id) {
3125 session
3126 .peer
3127 .send(
3128 connection_id,
3129 proto::CallCanceled {
3130 room_id: room_id.to_proto(),
3131 },
3132 )
3133 .trace_err();
3134 }
3135 contacts_to_update.insert(canceled_user_id);
3136 }
3137 }
3138
3139 for contact_user_id in contacts_to_update {
3140 update_user_contacts(contact_user_id, &session).await?;
3141 }
3142
3143 if let Some(live_kit) = session.live_kit_client.as_ref() {
3144 live_kit
3145 .remove_participant(live_kit_room.clone(), session.user_id.to_string())
3146 .await
3147 .trace_err();
3148
3149 if delete_live_kit_room {
3150 live_kit.delete_room(live_kit_room).await.trace_err();
3151 }
3152 }
3153
3154 Ok(())
3155}
3156
3157async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
3158 let left_channel_buffers = session
3159 .db()
3160 .await
3161 .leave_channel_buffers(session.connection_id)
3162 .await?;
3163
3164 for (channel_id, connections) in left_channel_buffers {
3165 channel_buffer_updated(
3166 session.connection_id,
3167 connections,
3168 &proto::RemoveChannelBufferCollaborator {
3169 channel_id: channel_id.to_proto(),
3170 peer_id: Some(session.connection_id.into()),
3171 },
3172 &session.peer,
3173 );
3174 }
3175
3176 Ok(())
3177}
3178
3179fn project_left(project: &db::LeftProject, session: &Session) {
3180 for connection_id in &project.connection_ids {
3181 if project.host_user_id == session.user_id {
3182 session
3183 .peer
3184 .send(
3185 *connection_id,
3186 proto::UnshareProject {
3187 project_id: project.id.to_proto(),
3188 },
3189 )
3190 .trace_err();
3191 } else {
3192 session
3193 .peer
3194 .send(
3195 *connection_id,
3196 proto::RemoveProjectCollaborator {
3197 project_id: project.id.to_proto(),
3198 peer_id: Some(session.connection_id.into()),
3199 },
3200 )
3201 .trace_err();
3202 }
3203 }
3204}
3205
3206pub trait ResultExt {
3207 type Ok;
3208
3209 fn trace_err(self) -> Option<Self::Ok>;
3210}
3211
3212impl<T, E> ResultExt for Result<T, E>
3213where
3214 E: std::fmt::Debug,
3215{
3216 type Ok = T;
3217
3218 fn trace_err(self) -> Option<T> {
3219 match self {
3220 Ok(value) => Some(value),
3221 Err(error) => {
3222 tracing::error!("{:?}", error);
3223 None
3224 }
3225 }
3226 }
3227}